1use super::RocksDBProvider;
8use crate::StaticFileProviderFactory;
9use alloy_eips::eip2718::Encodable2718;
10use alloy_primitives::BlockNumber;
11use rayon::prelude::*;
12use reth_db_api::tables;
13use reth_stages_types::StageId;
14use reth_static_file_types::StaticFileSegment;
15use reth_storage_api::{
16 BlockBodyIndicesProvider, ChangeSetReader, DBProvider, StageCheckpointReader,
17 StorageChangeSetReader, StorageSettingsCache, TransactionsProvider,
18};
19use reth_storage_errors::provider::ProviderResult;
20use std::collections::HashSet;
21
22const HEAL_HISTORY_BATCH_SIZE: u64 = 10_000;
25
26impl RocksDBProvider {
27 pub fn check_consistency<Provider>(
49 &self,
50 provider: &Provider,
51 ) -> ProviderResult<Option<BlockNumber>>
52 where
53 Provider: DBProvider
54 + StageCheckpointReader
55 + StorageSettingsCache
56 + StaticFileProviderFactory
57 + BlockBodyIndicesProvider
58 + StorageChangeSetReader
59 + ChangeSetReader
60 + TransactionsProvider<Transaction: Encodable2718>,
61 {
62 let mut unwind_target: Option<BlockNumber> = None;
63
64 if provider.cached_storage_settings().storage_v2 &&
66 let Some(target) = self.heal_transaction_hash_numbers(provider)?
67 {
68 unwind_target = Some(unwind_target.map_or(target, |t| t.min(target)));
69 }
70
71 if provider.cached_storage_settings().storage_v2 &&
73 let Some(target) = self.heal_storages_history(provider)?
74 {
75 unwind_target = Some(unwind_target.map_or(target, |t| t.min(target)));
76 }
77
78 if provider.cached_storage_settings().storage_v2 &&
80 let Some(target) = self.heal_accounts_history(provider)?
81 {
82 unwind_target = Some(unwind_target.map_or(target, |t| t.min(target)));
83 }
84
85 Ok(unwind_target)
86 }
87
88 fn heal_transaction_hash_numbers<Provider>(
95 &self,
96 provider: &Provider,
97 ) -> ProviderResult<Option<BlockNumber>>
98 where
99 Provider: DBProvider
100 + StageCheckpointReader
101 + StaticFileProviderFactory
102 + BlockBodyIndicesProvider
103 + TransactionsProvider<Transaction: Encodable2718>,
104 {
105 let checkpoint = provider
106 .get_stage_checkpoint(StageId::TransactionLookup)?
107 .map(|cp| cp.block_number)
108 .unwrap_or(0);
109
110 let sf_tip = provider
111 .static_file_provider()
112 .get_highest_static_file_block(StaticFileSegment::Transactions)
113 .unwrap_or(0);
114
115 if checkpoint == 0 {
117 tracing::info!(
118 target: "reth::providers::rocksdb",
119 "TransactionHashNumbers: checkpoint is 0, clearing stale data"
120 );
121 self.clear::<tables::TransactionHashNumbers>()?;
122 return Ok(None);
123 }
124
125 if sf_tip < checkpoint {
126 tracing::warn!(
130 target: "reth::providers::rocksdb",
131 sf_tip,
132 checkpoint,
133 "TransactionHashNumbers: static file tip behind checkpoint, unwind needed"
134 );
135 return Ok(Some(sf_tip));
136 }
137
138 if sf_tip == checkpoint {
140 return Ok(None);
141 }
142
143 let sf_tip_end_tx = provider
145 .static_file_provider()
146 .get_highest_static_file_tx(StaticFileSegment::Transactions)
147 .unwrap_or(0);
148
149 let checkpoint_next_tx = provider
151 .block_body_indices(checkpoint)?
152 .map(|indices| indices.next_tx_num())
153 .unwrap_or(0);
154
155 if sf_tip_end_tx < checkpoint_next_tx {
156 tracing::warn!(
160 target: "reth::providers::rocksdb",
161 sf_tip_end_tx,
162 checkpoint_next_tx,
163 checkpoint,
164 sf_tip,
165 "TransactionHashNumbers: static file tx tip behind checkpoint, unwind needed"
166 );
167 return Ok(Some(sf_tip));
168 }
169
170 tracing::info!(
171 target: "reth::providers::rocksdb",
172 checkpoint,
173 sf_tip,
174 checkpoint_next_tx,
175 sf_tip_end_tx,
176 "TransactionHashNumbers: healing via transaction ranges"
177 );
178
179 const BATCH_SIZE: u64 = 10_000;
180 let mut batch_start = checkpoint_next_tx;
181
182 while batch_start <= sf_tip_end_tx {
183 let batch_end = batch_start.saturating_add(BATCH_SIZE - 1).min(sf_tip_end_tx);
184
185 tracing::debug!(
186 target: "reth::providers::rocksdb",
187 batch_start,
188 batch_end,
189 "Pruning TransactionHashNumbers batch"
190 );
191
192 self.prune_transaction_hash_numbers_in_range(provider, batch_start..=batch_end)?;
193
194 batch_start = batch_end.saturating_add(1);
195 }
196
197 Ok(None)
198 }
199
200 fn prune_transaction_hash_numbers_in_range<Provider>(
213 &self,
214 provider: &Provider,
215 tx_range: std::ops::RangeInclusive<u64>,
216 ) -> ProviderResult<()>
217 where
218 Provider: TransactionsProvider<Transaction: Encodable2718>,
219 {
220 if tx_range.is_empty() {
221 return Ok(());
222 }
223
224 let hashes: Vec<_> = provider
226 .transactions_by_tx_range(tx_range.clone())?
227 .into_par_iter()
228 .map(|tx| tx.trie_hash())
229 .collect();
230
231 if !hashes.is_empty() {
232 tracing::info!(
233 target: "reth::providers::rocksdb",
234 deleted_count = hashes.len(),
235 tx_range_start = *tx_range.start(),
236 tx_range_end = *tx_range.end(),
237 "Pruning TransactionHashNumbers entries by tx range"
238 );
239
240 let mut batch = self.batch();
241 for hash in hashes {
242 batch.delete::<tables::TransactionHashNumbers>(hash)?;
243 }
244 batch.commit()?;
245 }
246
247 Ok(())
248 }
249
250 fn heal_storages_history<Provider>(
255 &self,
256 provider: &Provider,
257 ) -> ProviderResult<Option<BlockNumber>>
258 where
259 Provider:
260 DBProvider + StageCheckpointReader + StaticFileProviderFactory + StorageChangeSetReader,
261 {
262 let checkpoint = provider
263 .get_stage_checkpoint(StageId::IndexStorageHistory)?
264 .map(|cp| cp.block_number)
265 .unwrap_or(0);
266
267 if checkpoint == 0 {
269 tracing::info!(
270 target: "reth::providers::rocksdb",
271 "StoragesHistory: checkpoint is 0, clearing stale data"
272 );
273 self.clear::<tables::StoragesHistory>()?;
274 return Ok(None);
275 }
276
277 let sf_tip = provider
278 .static_file_provider()
279 .get_highest_static_file_block(StaticFileSegment::StorageChangeSets)
280 .unwrap_or(0);
281
282 if sf_tip < checkpoint {
283 tracing::warn!(
287 target: "reth::providers::rocksdb",
288 sf_tip,
289 checkpoint,
290 "StoragesHistory: static file tip behind checkpoint, unwind needed"
291 );
292 return Ok(Some(sf_tip));
293 }
294
295 if sf_tip == checkpoint {
296 return Ok(None);
297 }
298
299 let total_blocks = sf_tip - checkpoint;
300 tracing::info!(
301 target: "reth::providers::rocksdb",
302 checkpoint,
303 sf_tip,
304 total_blocks,
305 "StoragesHistory: healing via changesets"
306 );
307
308 let mut batch_start = checkpoint + 1;
309 let mut batch_num = 0u64;
310 let total_batches = total_blocks.div_ceil(HEAL_HISTORY_BATCH_SIZE);
311
312 while batch_start <= sf_tip {
313 let batch_end = (batch_start + HEAL_HISTORY_BATCH_SIZE - 1).min(sf_tip);
314 batch_num += 1;
315
316 let changesets = provider.storage_changesets_range(batch_start..=batch_end)?;
317
318 let unique_keys: HashSet<_> = changesets
319 .into_iter()
320 .map(|(block_addr, entry)| (block_addr.address(), entry.key, checkpoint + 1))
321 .collect();
322 let indices: Vec<_> = unique_keys.into_iter().collect();
323
324 if !indices.is_empty() {
325 tracing::info!(
326 target: "reth::providers::rocksdb",
327 batch_num,
328 total_batches,
329 batch_start,
330 batch_end,
331 indices_count = indices.len(),
332 "StoragesHistory: unwinding batch"
333 );
334
335 let batch = self.unwind_storage_history_indices(&indices)?;
336 self.commit_batch(batch)?;
337 }
338
339 batch_start = batch_end + 1;
340 }
341
342 Ok(None)
343 }
344
345 fn heal_accounts_history<Provider>(
350 &self,
351 provider: &Provider,
352 ) -> ProviderResult<Option<BlockNumber>>
353 where
354 Provider: DBProvider + StageCheckpointReader + StaticFileProviderFactory + ChangeSetReader,
355 {
356 let checkpoint = provider
357 .get_stage_checkpoint(StageId::IndexAccountHistory)?
358 .map(|cp| cp.block_number)
359 .unwrap_or(0);
360
361 if checkpoint == 0 {
363 tracing::info!(
364 target: "reth::providers::rocksdb",
365 "AccountsHistory: checkpoint is 0, clearing stale data"
366 );
367 self.clear::<tables::AccountsHistory>()?;
368 return Ok(None);
369 }
370
371 let sf_tip = provider
372 .static_file_provider()
373 .get_highest_static_file_block(StaticFileSegment::AccountChangeSets)
374 .unwrap_or(0);
375
376 if sf_tip < checkpoint {
377 tracing::warn!(
381 target: "reth::providers::rocksdb",
382 sf_tip,
383 checkpoint,
384 "AccountsHistory: static file tip behind checkpoint, unwind needed"
385 );
386 return Ok(Some(sf_tip));
387 }
388
389 if sf_tip == checkpoint {
390 return Ok(None);
391 }
392
393 let total_blocks = sf_tip - checkpoint;
394 tracing::info!(
395 target: "reth::providers::rocksdb",
396 checkpoint,
397 sf_tip,
398 total_blocks,
399 "AccountsHistory: healing via changesets"
400 );
401
402 let mut batch_start = checkpoint + 1;
403 let mut batch_num = 0u64;
404 let total_batches = total_blocks.div_ceil(HEAL_HISTORY_BATCH_SIZE);
405
406 while batch_start <= sf_tip {
407 let batch_end = (batch_start + HEAL_HISTORY_BATCH_SIZE - 1).min(sf_tip);
408 batch_num += 1;
409
410 let changesets = provider.account_changesets_range(batch_start..=batch_end)?;
411
412 let mut addresses = HashSet::with_capacity(changesets.len());
413 addresses.extend(changesets.iter().map(|(_, cs)| cs.address));
414 let unwind_from = checkpoint + 1;
415 let indices: Vec<_> = addresses.into_iter().map(|addr| (addr, unwind_from)).collect();
416
417 if !indices.is_empty() {
418 tracing::info!(
419 target: "reth::providers::rocksdb",
420 batch_num,
421 total_batches,
422 batch_start,
423 batch_end,
424 indices_count = indices.len(),
425 "AccountsHistory: unwinding batch"
426 );
427
428 let batch = self.unwind_account_history_indices(&indices)?;
429 self.commit_batch(batch)?;
430 }
431
432 batch_start = batch_end + 1;
433 }
434
435 Ok(None)
436 }
437}
438
439#[cfg(test)]
440mod tests {
441 use super::*;
442 use crate::{
443 providers::{rocksdb::RocksDBBuilder, static_file::StaticFileWriter},
444 test_utils::create_test_provider_factory,
445 BlockWriter, DatabaseProviderFactory, StageCheckpointWriter, TransactionsProvider,
446 };
447 use alloy_primitives::{Address, B256};
448 use reth_db::cursor::{DbCursorRO, DbCursorRW};
449 use reth_db_api::{
450 models::{storage_sharded_key::StorageShardedKey, StorageSettings},
451 tables::{self, BlockNumberList},
452 transaction::DbTxMut,
453 };
454 use reth_stages_types::StageCheckpoint;
455 use reth_testing_utils::generators::{self, BlockRangeParams};
456 use tempfile::TempDir;
457
458 #[test]
459 fn test_first_last_empty_rocksdb() {
460 let temp_dir = TempDir::new().unwrap();
461 let provider = RocksDBBuilder::new(temp_dir.path())
462 .with_table::<tables::TransactionHashNumbers>()
463 .with_table::<tables::StoragesHistory>()
464 .build()
465 .unwrap();
466
467 let first = provider.first::<tables::TransactionHashNumbers>().unwrap();
469 let last = provider.last::<tables::TransactionHashNumbers>().unwrap();
470
471 assert!(first.is_none());
472 assert!(last.is_none());
473 }
474
475 #[test]
476 fn test_first_last_with_data() {
477 let temp_dir = TempDir::new().unwrap();
478 let provider = RocksDBBuilder::new(temp_dir.path())
479 .with_table::<tables::TransactionHashNumbers>()
480 .build()
481 .unwrap();
482
483 let tx_hash = B256::from([1u8; 32]);
485 provider.put::<tables::TransactionHashNumbers>(tx_hash, &100).unwrap();
486
487 let last = provider.last::<tables::TransactionHashNumbers>().unwrap();
489 assert!(last.is_some());
490 assert_eq!(last.unwrap().1, 100);
491 }
492
493 #[test]
494 fn test_check_consistency_empty_rocksdb_no_checkpoint_is_ok() {
495 let temp_dir = TempDir::new().unwrap();
496 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
497
498 let factory = create_test_provider_factory();
500 factory.set_storage_settings_cache(StorageSettings::v2());
501
502 let provider = factory.database_provider_ro().unwrap();
503
504 let result = rocksdb.check_consistency(&provider).unwrap();
506 assert_eq!(result, None);
507 }
508
509 #[test]
514 fn test_check_consistency_checkpoint_zero_empty_rocksdb_returns_early() {
515 let temp_dir = TempDir::new().unwrap();
516 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
517
518 let factory = create_test_provider_factory();
519 factory.set_storage_settings_cache(StorageSettings::v2());
520
521 let provider = factory.database_provider_ro().unwrap();
524
525 let result = rocksdb.heal_transaction_hash_numbers(&provider).unwrap();
526 assert_eq!(result, None, "TransactionHashNumbers should return early at checkpoint 0");
527 assert!(rocksdb.first::<tables::TransactionHashNumbers>().unwrap().is_none());
528
529 let result = rocksdb.heal_storages_history(&provider).unwrap();
530 assert_eq!(result, None, "StoragesHistory should return early at checkpoint 0");
531 assert!(rocksdb.first::<tables::StoragesHistory>().unwrap().is_none());
532
533 let result = rocksdb.heal_accounts_history(&provider).unwrap();
534 assert_eq!(result, None, "AccountsHistory should return early at checkpoint 0");
535 assert!(rocksdb.first::<tables::AccountsHistory>().unwrap().is_none());
536 }
537
538 #[test]
539 fn test_check_consistency_empty_rocksdb_with_checkpoint_is_first_run() {
540 let temp_dir = TempDir::new().unwrap();
541 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
542
543 let factory = create_test_provider_factory();
545 factory.set_storage_settings_cache(StorageSettings::v2());
546
547 {
549 let provider = factory.database_provider_rw().unwrap();
550 provider
551 .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(100))
552 .unwrap();
553 provider.commit().unwrap();
554 }
555
556 let provider = factory.database_provider_ro().unwrap();
557
558 let result = rocksdb.check_consistency(&provider).unwrap();
561 assert_eq!(result, Some(0), "Static file tip (0) behind checkpoint (100) triggers unwind");
562 }
563
564 #[test]
567 fn test_check_consistency_checkpoint_zero_with_rocksdb_data_prunes_all() {
568 let temp_dir = TempDir::new().unwrap();
569 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
570
571 let factory = create_test_provider_factory();
572 factory.set_storage_settings_cache(StorageSettings::v2());
573
574 let mut rng = generators::rng();
576 let blocks = generators::random_block_range(
577 &mut rng,
578 0..=2,
579 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
580 );
581
582 let mut tx_hashes = Vec::new();
583 {
584 let provider = factory.database_provider_rw().unwrap();
585 let mut tx_count = 0u64;
586 for block in &blocks {
587 provider
588 .insert_block(&block.clone().try_recover().expect("recover block"))
589 .unwrap();
590 for tx in &block.body().transactions {
591 let hash = tx.trie_hash();
592 tx_hashes.push(hash);
593 rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
594 tx_count += 1;
595 }
596 }
597 provider.commit().unwrap();
598 }
599
600 {
602 let provider = factory.database_provider_rw().unwrap();
603 provider
604 .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(0))
605 .unwrap();
606 provider
607 .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(0))
608 .unwrap();
609 provider
610 .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(0))
611 .unwrap();
612 provider.commit().unwrap();
613 }
614
615 assert!(rocksdb.last::<tables::TransactionHashNumbers>().unwrap().is_some());
617
618 let provider = factory.database_provider_ro().unwrap();
619
620 let result = rocksdb.check_consistency(&provider).unwrap();
623 assert_eq!(result, None, "Should heal by clearing, no unwind needed");
624
625 for hash in &tx_hashes {
627 assert!(
628 rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap().is_none(),
629 "RocksDB should be empty after pruning"
630 );
631 }
632 }
633
634 #[test]
635 fn test_check_consistency_storages_history_empty_with_checkpoint_is_first_run() {
636 let temp_dir = TempDir::new().unwrap();
637 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
638
639 let factory = create_test_provider_factory();
641 factory.set_storage_settings_cache(StorageSettings::v2());
642
643 {
645 let provider = factory.database_provider_rw().unwrap();
646 provider
647 .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
648 .unwrap();
649 provider.commit().unwrap();
650 }
651
652 let provider = factory.database_provider_ro().unwrap();
653
654 let result = rocksdb.check_consistency(&provider).unwrap();
658 assert_eq!(result, Some(0), "sf_tip=0 < checkpoint=100 returns unwind target");
659 }
660
661 #[test]
662 fn test_check_consistency_storages_history_has_data_no_checkpoint_prunes_data() {
663 let temp_dir = TempDir::new().unwrap();
664 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
665
666 let key = StorageShardedKey::new(Address::ZERO, B256::ZERO, 50);
668 let block_list = BlockNumberList::new_pre_sorted([10, 20, 30, 50]);
669 rocksdb.put::<tables::StoragesHistory>(key, &block_list).unwrap();
670
671 assert!(rocksdb.last::<tables::StoragesHistory>().unwrap().is_some());
673
674 let factory = create_test_provider_factory();
676 factory.set_storage_settings_cache(StorageSettings::v2());
677
678 let provider = factory.database_provider_ro().unwrap();
679
680 let result = rocksdb.check_consistency(&provider).unwrap();
683 assert_eq!(result, None, "Should heal by pruning, no unwind needed");
684
685 assert!(
687 rocksdb.last::<tables::StoragesHistory>().unwrap().is_none(),
688 "RocksDB should be empty after pruning"
689 );
690 }
691 #[test]
692 fn test_check_consistency_mdbx_behind_checkpoint_needs_unwind() {
693 let temp_dir = TempDir::new().unwrap();
694 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
695
696 let factory = create_test_provider_factory();
697 factory.set_storage_settings_cache(StorageSettings::v2());
698
699 let mut rng = generators::rng();
701 let blocks = generators::random_block_range(
702 &mut rng,
703 0..=2,
704 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
705 );
706
707 {
708 let provider = factory.database_provider_rw().unwrap();
709 let mut tx_count = 0u64;
710 for block in &blocks {
711 provider
712 .insert_block(&block.clone().try_recover().expect("recover block"))
713 .unwrap();
714 for tx in &block.body().transactions {
715 let hash = tx.trie_hash();
716 rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
717 tx_count += 1;
718 }
719 }
720 provider.commit().unwrap();
721 }
722
723 {
727 let provider = factory.database_provider_rw().unwrap();
728 provider
729 .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(10))
730 .unwrap();
731 provider
733 .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(0))
734 .unwrap();
735 provider
736 .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(0))
737 .unwrap();
738 provider.commit().unwrap();
739 }
740
741 let provider = factory.database_provider_ro().unwrap();
742
743 let result = rocksdb.check_consistency(&provider).unwrap();
745 assert_eq!(result, Some(2), "sf_tip < checkpoint requires unwind to sf_tip");
746 }
747
748 #[test]
749 fn test_check_consistency_rocksdb_ahead_of_checkpoint_prunes_excess() {
750 let temp_dir = TempDir::new().unwrap();
751 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
752
753 let factory = create_test_provider_factory();
755 factory.set_storage_settings_cache(StorageSettings::v2());
756
757 let mut rng = generators::rng();
760 let blocks = generators::random_block_range(
761 &mut rng,
762 0..=5,
763 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
764 );
765
766 let mut tx_hashes = Vec::new();
768 let mut tx_count = 0u64;
769 {
770 let provider = factory.database_provider_rw().unwrap();
771 for block in &blocks {
773 provider
774 .insert_block(&block.clone().try_recover().expect("recover block"))
775 .unwrap();
776 for tx in &block.body().transactions {
777 let hash = tx.trie_hash();
778 tx_hashes.push(hash);
779 rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
780 tx_count += 1;
781 }
782 }
783 provider.commit().unwrap();
784 }
785
786 {
790 let provider = factory.database_provider_rw().unwrap();
791 let mut cursor = provider.tx_ref().cursor_write::<tables::TransactionBlocks>().unwrap();
795 let mut to_delete = Vec::new();
797 let mut walker = cursor.walk(Some(0)).unwrap();
798 while let Some((tx_num, block_num)) = walker.next().transpose().unwrap() {
799 if block_num > 2 {
800 to_delete.push(tx_num);
801 }
802 }
803 drop(walker);
804 for tx_num in to_delete {
805 cursor.seek_exact(tx_num).unwrap();
806 cursor.delete_current().unwrap();
807 }
808
809 provider
811 .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(2))
812 .unwrap();
813 provider
815 .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(0))
816 .unwrap();
817 provider
818 .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(0))
819 .unwrap();
820 provider.commit().unwrap();
821 }
822
823 let provider = factory.database_provider_ro().unwrap();
824
825 let result = rocksdb.check_consistency(&provider).unwrap();
830 assert_eq!(result, None, "Should heal by pruning, no unwind needed");
831
832 for (i, hash) in tx_hashes.iter().take(6).enumerate() {
835 assert!(
836 rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap().is_some(),
837 "tx {} should remain",
838 i
839 );
840 }
841 for (i, hash) in tx_hashes.iter().skip(6).enumerate() {
843 assert!(
844 rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap().is_none(),
845 "tx {} should be pruned",
846 i + 6
847 );
848 }
849 }
850
851 #[test]
852 fn test_check_consistency_storages_history_sentinel_only_with_checkpoint_is_first_run() {
853 let temp_dir = TempDir::new().unwrap();
854 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
855
856 let key_sentinel_1 = StorageShardedKey::new(Address::ZERO, B256::ZERO, u64::MAX);
859 let key_sentinel_2 = StorageShardedKey::new(Address::random(), B256::random(), u64::MAX);
860 let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
861 rocksdb.put::<tables::StoragesHistory>(key_sentinel_1, &block_list).unwrap();
862 rocksdb.put::<tables::StoragesHistory>(key_sentinel_2, &block_list).unwrap();
863
864 assert!(rocksdb.first::<tables::StoragesHistory>().unwrap().is_some());
866
867 let factory = create_test_provider_factory();
869 factory.set_storage_settings_cache(StorageSettings::v2());
870
871 {
873 let provider = factory.database_provider_rw().unwrap();
874 provider
875 .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
876 .unwrap();
877 provider.commit().unwrap();
878 }
879
880 let provider = factory.database_provider_ro().unwrap();
881
882 let result = rocksdb.check_consistency(&provider).unwrap();
886 assert_eq!(result, Some(0), "sf_tip=0 < checkpoint=100 returns unwind target");
887 }
888
889 #[test]
890 fn test_check_consistency_accounts_history_sentinel_only_with_checkpoint_is_first_run() {
891 use reth_db_api::models::ShardedKey;
892
893 let temp_dir = TempDir::new().unwrap();
894 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
895
896 let key_sentinel_1 = ShardedKey::new(Address::ZERO, u64::MAX);
898 let key_sentinel_2 = ShardedKey::new(Address::random(), u64::MAX);
899 let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
900 rocksdb.put::<tables::AccountsHistory>(key_sentinel_1, &block_list).unwrap();
901 rocksdb.put::<tables::AccountsHistory>(key_sentinel_2, &block_list).unwrap();
902
903 assert!(rocksdb.first::<tables::AccountsHistory>().unwrap().is_some());
905
906 let factory = create_test_provider_factory();
908 factory.set_storage_settings_cache(StorageSettings::v2());
909
910 {
912 let provider = factory.database_provider_rw().unwrap();
913 provider
914 .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
915 .unwrap();
916 provider.commit().unwrap();
917 }
918
919 let provider = factory.database_provider_ro().unwrap();
920
921 let result = rocksdb.check_consistency(&provider).unwrap();
925 assert_eq!(result, Some(0), "sf_tip=0 < checkpoint=100 returns unwind target");
926 }
927
928 #[test]
932 fn test_prune_transaction_hash_numbers_by_range() {
933 let temp_dir = TempDir::new().unwrap();
934 let rocksdb = RocksDBBuilder::new(temp_dir.path())
935 .with_table::<tables::TransactionHashNumbers>()
936 .build()
937 .unwrap();
938
939 let factory = create_test_provider_factory();
941 factory.set_storage_settings_cache(StorageSettings::v2());
942
943 let mut rng = generators::rng();
947 let blocks = generators::random_block_range(
948 &mut rng,
949 0..=5,
950 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
951 );
952
953 let mut tx_count = 0u64;
955 let mut tx_hashes = Vec::new();
956 {
957 let provider = factory.database_provider_rw().unwrap();
958
959 for block in &blocks {
960 provider
961 .insert_block(&block.clone().try_recover().expect("recover block"))
962 .unwrap();
963
964 for tx in &block.body().transactions {
966 let hash = tx.trie_hash();
967 tx_hashes.push(hash);
968 rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
969 tx_count += 1;
970 }
971 }
972
973 provider
976 .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(2))
977 .unwrap();
978 provider.commit().unwrap();
979 }
980
981 let rocksdb_count_before: usize =
988 rocksdb.iter::<tables::TransactionHashNumbers>().unwrap().count();
989 assert_eq!(
990 rocksdb_count_before, tx_count as usize,
991 "RocksDB should have all {} transaction hashes before pruning",
992 tx_count
993 );
994
995 let provider = factory.database_provider_ro().unwrap();
996
997 let all_txs = provider.transactions_by_tx_range(0..tx_count).unwrap();
999 assert_eq!(all_txs.len(), tx_count as usize, "Should be able to fetch all transactions");
1000
1001 for (i, tx) in all_txs.iter().enumerate() {
1003 let computed_hash = tx.trie_hash();
1004 assert_eq!(
1005 computed_hash, tx_hashes[i],
1006 "Hash mismatch for tx {}: stored {:?} vs computed {:?}",
1007 i, tx_hashes[i], computed_hash
1008 );
1009 }
1010
1011 let max_tx_to_keep = 5u64;
1014 let tx_to_prune_start = max_tx_to_keep + 1;
1015
1016 rocksdb
1018 .prune_transaction_hash_numbers_in_range(&provider, tx_to_prune_start..=(tx_count - 1))
1019 .expect("prune should succeed");
1020
1021 let mut remaining_count = 0;
1023 for result in rocksdb.iter::<tables::TransactionHashNumbers>().unwrap() {
1024 let (_hash, tx_num) = result.unwrap();
1025 assert!(
1026 tx_num <= max_tx_to_keep,
1027 "Transaction {} should have been pruned (> {})",
1028 tx_num,
1029 max_tx_to_keep
1030 );
1031 remaining_count += 1;
1032 }
1033 assert_eq!(
1034 remaining_count,
1035 (max_tx_to_keep + 1) as usize,
1036 "Should have {} transactions (0-{})",
1037 max_tx_to_keep + 1,
1038 max_tx_to_keep
1039 );
1040 }
1041
1042 #[test]
1043 fn test_check_consistency_accounts_history_empty_with_checkpoint_is_first_run() {
1044 let temp_dir = TempDir::new().unwrap();
1045 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1046
1047 let factory = create_test_provider_factory();
1049 factory.set_storage_settings_cache(StorageSettings::v2());
1050
1051 {
1053 let provider = factory.database_provider_rw().unwrap();
1054 provider
1055 .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
1056 .unwrap();
1057 provider.commit().unwrap();
1058 }
1059
1060 let provider = factory.database_provider_ro().unwrap();
1061
1062 let result = rocksdb.check_consistency(&provider).unwrap();
1066 assert_eq!(result, Some(0), "sf_tip=0 < checkpoint=100 returns unwind target");
1067 }
1068
1069 #[test]
1070 fn test_check_consistency_accounts_history_has_data_no_checkpoint_prunes_data() {
1071 use reth_db_api::models::ShardedKey;
1072
1073 let temp_dir = TempDir::new().unwrap();
1074 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1075
1076 let key = ShardedKey::new(Address::ZERO, 50);
1078 let block_list = BlockNumberList::new_pre_sorted([10, 20, 30, 50]);
1079 rocksdb.put::<tables::AccountsHistory>(key, &block_list).unwrap();
1080
1081 assert!(rocksdb.last::<tables::AccountsHistory>().unwrap().is_some());
1083
1084 let factory = create_test_provider_factory();
1086 factory.set_storage_settings_cache(StorageSettings::v2());
1087
1088 let provider = factory.database_provider_ro().unwrap();
1089
1090 let result = rocksdb.check_consistency(&provider).unwrap();
1093 assert_eq!(result, None, "Should heal by pruning, no unwind needed");
1094
1095 assert!(
1097 rocksdb.last::<tables::AccountsHistory>().unwrap().is_none(),
1098 "RocksDB should be empty after pruning"
1099 );
1100 }
1101
1102 #[test]
1103 fn test_check_consistency_accounts_history_sf_tip_equals_checkpoint_no_action() {
1104 use reth_db::models::AccountBeforeTx;
1105 use reth_db_api::models::ShardedKey;
1106 use reth_static_file_types::StaticFileSegment;
1107
1108 let temp_dir = TempDir::new().unwrap();
1109 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1110
1111 let key1 = ShardedKey::new(Address::ZERO, 50);
1113 let key2 = ShardedKey::new(Address::random(), 75);
1114 let key3 = ShardedKey::new(Address::random(), u64::MAX); let block_list1 = BlockNumberList::new_pre_sorted([10, 20, 30, 50]);
1116 let block_list2 = BlockNumberList::new_pre_sorted([40, 60, 75]);
1117 let block_list3 = BlockNumberList::new_pre_sorted([80, 90, 100]);
1118 rocksdb.put::<tables::AccountsHistory>(key1, &block_list1).unwrap();
1119 rocksdb.put::<tables::AccountsHistory>(key2, &block_list2).unwrap();
1120 rocksdb.put::<tables::AccountsHistory>(key3, &block_list3).unwrap();
1121
1122 let entries_before: Vec<_> =
1124 rocksdb.iter::<tables::AccountsHistory>().unwrap().map(|r| r.unwrap()).collect();
1125 assert_eq!(entries_before.len(), 3, "Should have 3 entries before check");
1126
1127 let factory = create_test_provider_factory();
1129 factory.set_storage_settings_cache(StorageSettings::v2());
1130
1131 {
1133 let sf_provider = factory.static_file_provider();
1134 let mut writer =
1135 sf_provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
1136
1137 for block_num in 0..=100 {
1138 let changeset = vec![AccountBeforeTx { address: Address::random(), info: None }];
1139 writer.append_account_changeset(changeset, block_num).unwrap();
1140 }
1141
1142 writer.commit().unwrap();
1143 }
1144
1145 {
1147 let provider = factory.database_provider_rw().unwrap();
1148 provider
1149 .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
1150 .unwrap();
1151 provider.commit().unwrap();
1152 }
1153
1154 let provider = factory.database_provider_ro().unwrap();
1155
1156 let sf_tip = provider
1158 .static_file_provider()
1159 .get_highest_static_file_block(StaticFileSegment::AccountChangeSets)
1160 .unwrap();
1161 assert_eq!(sf_tip, 100, "Static file tip should be 100");
1162
1163 let result = rocksdb.check_consistency(&provider).unwrap();
1165 assert_eq!(result, None, "sf_tip == checkpoint should not require unwind");
1166
1167 let entries_after: Vec<_> =
1169 rocksdb.iter::<tables::AccountsHistory>().unwrap().map(|r| r.unwrap()).collect();
1170
1171 assert_eq!(
1172 entries_after.len(),
1173 entries_before.len(),
1174 "RocksDB entry count should be unchanged when sf_tip == checkpoint"
1175 );
1176
1177 for (before, after) in entries_before.iter().zip(entries_after.iter()) {
1179 assert_eq!(before.0.key, after.0.key, "Entry key should be unchanged");
1180 assert_eq!(
1181 before.0.highest_block_number, after.0.highest_block_number,
1182 "Entry highest_block_number should be unchanged"
1183 );
1184 assert_eq!(before.1, after.1, "Entry block list should be unchanged");
1185 }
1186 }
1187
1188 #[test]
1200 fn test_check_consistency_storages_history_heals_via_changesets_large_range() {
1201 use alloy_primitives::U256;
1202 use reth_db_api::models::StorageBeforeTx;
1203
1204 const TOTAL_BLOCKS: u64 = 15_000;
1205 const CHECKPOINT_BLOCK: u64 = 5_000;
1206
1207 let temp_dir = TempDir::new().unwrap();
1208 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1209
1210 let factory = create_test_provider_factory();
1211 factory.set_storage_settings_cache(StorageSettings::v2());
1212
1213 #[inline]
1215 fn make_address(block_num: u64) -> Address {
1216 let mut addr_bytes = [0u8; 20];
1217 addr_bytes[0..8].copy_from_slice(&block_num.to_le_bytes());
1218 Address::from(addr_bytes)
1219 }
1220
1221 #[inline]
1223 fn make_slot(block_num: u64) -> B256 {
1224 let mut slot_bytes = [0u8; 32];
1225 slot_bytes[0..8].copy_from_slice(&block_num.to_le_bytes());
1226 B256::from(slot_bytes)
1227 }
1228
1229 {
1232 let sf_provider = factory.static_file_provider();
1233 let mut writer =
1234 sf_provider.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
1235
1236 let mut changeset = Vec::with_capacity(1);
1238
1239 for block_num in 0..TOTAL_BLOCKS {
1240 changeset.clear();
1241 changeset.push(StorageBeforeTx {
1242 address: make_address(block_num),
1243 key: make_slot(block_num),
1244 value: U256::from(block_num),
1245 });
1246
1247 writer.append_storage_changeset(changeset.clone(), block_num).unwrap();
1248 }
1249
1250 writer.commit().unwrap();
1251 }
1252
1253 {
1255 let sf_provider = factory.static_file_provider();
1256 let highest = sf_provider
1257 .get_highest_static_file_block(StaticFileSegment::StorageChangeSets)
1258 .unwrap();
1259 assert_eq!(highest, TOTAL_BLOCKS - 1, "Static files should have blocks 0..14999");
1260 }
1261
1262 {
1264 let provider = factory.database_provider_rw().unwrap();
1265 provider
1266 .save_stage_checkpoint(
1267 StageId::IndexStorageHistory,
1268 StageCheckpoint::new(CHECKPOINT_BLOCK),
1269 )
1270 .unwrap();
1271 provider.commit().unwrap();
1272 }
1273
1274 for block_num in (CHECKPOINT_BLOCK + 1)..TOTAL_BLOCKS {
1277 let key =
1278 StorageShardedKey::new(make_address(block_num), make_slot(block_num), block_num);
1279 let block_list = BlockNumberList::new_pre_sorted([block_num]);
1280 rocksdb.put::<tables::StoragesHistory>(key, &block_list).unwrap();
1281 }
1282
1283 let count_before: usize = rocksdb.iter::<tables::StoragesHistory>().unwrap().count();
1285 assert_eq!(
1286 count_before,
1287 (TOTAL_BLOCKS - CHECKPOINT_BLOCK - 1) as usize,
1288 "Should have {} stale entries before healing",
1289 TOTAL_BLOCKS - CHECKPOINT_BLOCK - 1
1290 );
1291
1292 let provider = factory.database_provider_ro().unwrap();
1294 let result = rocksdb.check_consistency(&provider).unwrap();
1295 assert_eq!(result, None, "Should heal via changesets, no unwind needed");
1296
1297 let mut remaining_stale = 0;
1300 for result in rocksdb.iter::<tables::StoragesHistory>().unwrap() {
1301 let (key, _) = result.unwrap();
1302 if key.sharded_key.highest_block_number > CHECKPOINT_BLOCK {
1303 remaining_stale += 1;
1304 }
1305 }
1306 assert_eq!(
1307 remaining_stale, 0,
1308 "All stale entries (block > {}) should be pruned",
1309 CHECKPOINT_BLOCK
1310 );
1311 }
1312
1313 #[test]
1317 fn test_check_consistency_storages_history_preserves_checkpoint_block() {
1318 use alloy_primitives::U256;
1319 use reth_db_api::models::StorageBeforeTx;
1320
1321 const CHECKPOINT_BLOCK: u64 = 100;
1322 const SF_TIP: u64 = 200;
1323
1324 let temp_dir = TempDir::new().unwrap();
1325 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1326
1327 let factory = create_test_provider_factory();
1328 factory.set_storage_settings_cache(StorageSettings::v2());
1329
1330 let checkpoint_addr = Address::repeat_byte(0xAA);
1331 let checkpoint_slot = B256::repeat_byte(0xBB);
1332 let stale_addr = Address::repeat_byte(0xCC);
1333 let stale_slot = B256::repeat_byte(0xDD);
1334
1335 {
1337 let sf_provider = factory.static_file_provider();
1338 let mut writer =
1339 sf_provider.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
1340
1341 for block_num in 0..=SF_TIP {
1342 let changeset = if block_num == CHECKPOINT_BLOCK {
1343 vec![StorageBeforeTx {
1344 address: checkpoint_addr,
1345 key: checkpoint_slot,
1346 value: U256::from(block_num),
1347 }]
1348 } else if block_num > CHECKPOINT_BLOCK {
1349 vec![StorageBeforeTx {
1350 address: stale_addr,
1351 key: stale_slot,
1352 value: U256::from(block_num),
1353 }]
1354 } else {
1355 vec![StorageBeforeTx {
1356 address: Address::ZERO,
1357 key: B256::ZERO,
1358 value: U256::ZERO,
1359 }]
1360 };
1361 writer.append_storage_changeset(changeset, block_num).unwrap();
1362 }
1363 writer.commit().unwrap();
1364 }
1365
1366 {
1368 let provider = factory.database_provider_rw().unwrap();
1369 provider
1370 .save_stage_checkpoint(
1371 StageId::IndexStorageHistory,
1372 StageCheckpoint::new(CHECKPOINT_BLOCK),
1373 )
1374 .unwrap();
1375 provider.commit().unwrap();
1376 }
1377
1378 let checkpoint_key =
1380 StorageShardedKey::new(checkpoint_addr, checkpoint_slot, CHECKPOINT_BLOCK);
1381 let checkpoint_list = BlockNumberList::new_pre_sorted([CHECKPOINT_BLOCK]);
1382 rocksdb.put::<tables::StoragesHistory>(checkpoint_key.clone(), &checkpoint_list).unwrap();
1383
1384 let stale_key = StorageShardedKey::new(stale_addr, stale_slot, SF_TIP);
1386 let stale_list = BlockNumberList::new_pre_sorted([CHECKPOINT_BLOCK + 1, SF_TIP]);
1387 rocksdb.put::<tables::StoragesHistory>(stale_key.clone(), &stale_list).unwrap();
1388
1389 let provider = factory.database_provider_ro().unwrap();
1391 let result = rocksdb.check_consistency(&provider).unwrap();
1392 assert_eq!(result, None, "Should heal without unwind");
1393
1394 let preserved = rocksdb.get::<tables::StoragesHistory>(checkpoint_key).unwrap();
1396 assert!(preserved.is_some(), "Entry at checkpoint block should be preserved, not deleted");
1397
1398 let stale = rocksdb.get::<tables::StoragesHistory>(stale_key).unwrap();
1400 assert!(stale.is_none(), "Stale entry after checkpoint should be removed");
1401 }
1402
1403 #[test]
1417 fn test_check_consistency_accounts_history_heals_via_changesets_large_range() {
1418 use reth_db::models::AccountBeforeTx;
1419 use reth_db_api::models::ShardedKey;
1420 use reth_static_file_types::StaticFileSegment;
1421
1422 let temp_dir = TempDir::new().unwrap();
1423 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1424
1425 let factory = create_test_provider_factory();
1427 factory.set_storage_settings_cache(StorageSettings::v2());
1428
1429 const TOTAL_BLOCKS: u64 = 15_000;
1430 const CHECKPOINT_BLOCK: u64 = 5_000;
1431
1432 #[inline]
1434 fn make_address(block_num: u64) -> Address {
1435 let mut addr = Address::ZERO;
1436 addr.0[0..8].copy_from_slice(&block_num.to_le_bytes());
1437 addr
1438 }
1439
1440 {
1442 let sf_provider = factory.static_file_provider();
1443 let mut writer =
1444 sf_provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
1445
1446 let mut changeset = Vec::with_capacity(1);
1448
1449 for block_num in 0..TOTAL_BLOCKS {
1450 changeset.clear();
1451 changeset.push(AccountBeforeTx { address: make_address(block_num), info: None });
1452 writer.append_account_changeset(changeset.clone(), block_num).unwrap();
1453 }
1454
1455 writer.commit().unwrap();
1456 }
1457
1458 for block_num in (CHECKPOINT_BLOCK + 1)..TOTAL_BLOCKS {
1462 let key = ShardedKey::new(make_address(block_num), block_num);
1463 let block_list = BlockNumberList::new_pre_sorted([block_num]);
1464 rocksdb.put::<tables::AccountsHistory>(key, &block_list).unwrap();
1465 }
1466
1467 for block_num in [100u64, 500, 1000, 2500, 5000] {
1469 let key = ShardedKey::new(make_address(block_num), block_num);
1470 let block_list = BlockNumberList::new_pre_sorted([block_num]);
1471 rocksdb.put::<tables::AccountsHistory>(key, &block_list).unwrap();
1472 }
1473
1474 let entries_before: usize = rocksdb.iter::<tables::AccountsHistory>().unwrap().count();
1476 let stale_count = (TOTAL_BLOCKS - CHECKPOINT_BLOCK - 1) as usize;
1477 let valid_count = 5usize;
1478 assert_eq!(
1479 entries_before,
1480 stale_count + valid_count,
1481 "Should have {} stale + {} valid entries before healing",
1482 stale_count,
1483 valid_count
1484 );
1485
1486 {
1488 let provider = factory.database_provider_rw().unwrap();
1489 provider
1490 .save_stage_checkpoint(
1491 StageId::IndexAccountHistory,
1492 StageCheckpoint::new(CHECKPOINT_BLOCK),
1493 )
1494 .unwrap();
1495 provider.commit().unwrap();
1496 }
1497
1498 let provider = factory.database_provider_ro().unwrap();
1499
1500 let sf_tip = provider
1502 .static_file_provider()
1503 .get_highest_static_file_block(StaticFileSegment::AccountChangeSets)
1504 .unwrap();
1505 assert_eq!(sf_tip, TOTAL_BLOCKS - 1, "Static file tip should be 14999");
1506 assert!(sf_tip > CHECKPOINT_BLOCK, "sf_tip should be > checkpoint to trigger healing");
1507
1508 let result = rocksdb.check_consistency(&provider).unwrap();
1510 assert_eq!(result, None, "Healing should succeed without requiring unwind");
1511
1512 let mut remaining_stale = 0;
1515 for result in rocksdb.iter::<tables::AccountsHistory>().unwrap() {
1516 let (key, _) = result.unwrap();
1517 if key.highest_block_number > CHECKPOINT_BLOCK && key.highest_block_number != u64::MAX {
1518 remaining_stale += 1;
1519 }
1520 }
1521 assert_eq!(
1522 remaining_stale, 0,
1523 "All stale entries (block > {}) should be pruned",
1524 CHECKPOINT_BLOCK
1525 );
1526 }
1527
1528 #[test]
1530 fn test_check_consistency_accounts_history_preserves_checkpoint_block() {
1531 use reth_db::models::AccountBeforeTx;
1532 use reth_db_api::models::ShardedKey;
1533
1534 const CHECKPOINT_BLOCK: u64 = 100;
1535 const SF_TIP: u64 = 200;
1536
1537 let temp_dir = TempDir::new().unwrap();
1538 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1539
1540 let factory = create_test_provider_factory();
1541 factory.set_storage_settings_cache(StorageSettings::v2());
1542
1543 let checkpoint_addr = Address::repeat_byte(0xAA);
1544 let stale_addr = Address::repeat_byte(0xCC);
1545
1546 {
1548 let sf_provider = factory.static_file_provider();
1549 let mut writer =
1550 sf_provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
1551
1552 for block_num in 0..=SF_TIP {
1553 let changeset = if block_num == CHECKPOINT_BLOCK {
1554 vec![AccountBeforeTx { address: checkpoint_addr, info: None }]
1555 } else if block_num > CHECKPOINT_BLOCK {
1556 vec![AccountBeforeTx { address: stale_addr, info: None }]
1557 } else {
1558 vec![AccountBeforeTx { address: Address::ZERO, info: None }]
1559 };
1560 writer.append_account_changeset(changeset, block_num).unwrap();
1561 }
1562 writer.commit().unwrap();
1563 }
1564
1565 {
1567 let provider = factory.database_provider_rw().unwrap();
1568 provider
1569 .save_stage_checkpoint(
1570 StageId::IndexAccountHistory,
1571 StageCheckpoint::new(CHECKPOINT_BLOCK),
1572 )
1573 .unwrap();
1574 provider.commit().unwrap();
1575 }
1576
1577 let checkpoint_key = ShardedKey::new(checkpoint_addr, CHECKPOINT_BLOCK);
1579 let checkpoint_list = BlockNumberList::new_pre_sorted([CHECKPOINT_BLOCK]);
1580 rocksdb.put::<tables::AccountsHistory>(checkpoint_key.clone(), &checkpoint_list).unwrap();
1581
1582 let stale_key = ShardedKey::new(stale_addr, SF_TIP);
1584 let stale_list = BlockNumberList::new_pre_sorted([CHECKPOINT_BLOCK + 1, SF_TIP]);
1585 rocksdb.put::<tables::AccountsHistory>(stale_key.clone(), &stale_list).unwrap();
1586
1587 let provider = factory.database_provider_ro().unwrap();
1589 let result = rocksdb.check_consistency(&provider).unwrap();
1590 assert_eq!(result, None, "Should heal without unwind");
1591
1592 let preserved = rocksdb.get::<tables::AccountsHistory>(checkpoint_key).unwrap();
1594 assert!(preserved.is_some(), "Entry at checkpoint block should be preserved, not deleted");
1595
1596 let stale = rocksdb.get::<tables::AccountsHistory>(stale_key).unwrap();
1598 assert!(stale.is_none(), "Stale entry after checkpoint should be removed");
1599 }
1600
1601 #[test]
1602 fn test_check_consistency_storages_history_sf_tip_equals_checkpoint_no_action() {
1603 use alloy_primitives::U256;
1604 use reth_db::models::StorageBeforeTx;
1605 use reth_static_file_types::StaticFileSegment;
1606
1607 let temp_dir = TempDir::new().unwrap();
1608 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1609
1610 let key1 = StorageShardedKey::new(Address::ZERO, B256::ZERO, 50);
1612 let key2 = StorageShardedKey::new(Address::random(), B256::random(), 80);
1613 let block_list1 = BlockNumberList::new_pre_sorted([10, 20, 30, 50]);
1614 let block_list2 = BlockNumberList::new_pre_sorted([40, 60, 80]);
1615 rocksdb.put::<tables::StoragesHistory>(key1, &block_list1).unwrap();
1616 rocksdb.put::<tables::StoragesHistory>(key2, &block_list2).unwrap();
1617
1618 let entries_before: Vec<_> =
1620 rocksdb.iter::<tables::StoragesHistory>().unwrap().map(|r| r.unwrap()).collect();
1621
1622 let factory = create_test_provider_factory();
1624 factory.set_storage_settings_cache(StorageSettings::v2());
1625
1626 {
1628 let sf_provider = factory.static_file_provider();
1629 let mut writer =
1630 sf_provider.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
1631
1632 for block_num in 0..=100u64 {
1633 let changeset = vec![StorageBeforeTx {
1634 address: Address::ZERO,
1635 key: B256::with_last_byte(block_num as u8),
1636 value: U256::from(block_num),
1637 }];
1638 writer.append_storage_changeset(changeset, block_num).unwrap();
1639 }
1640 writer.commit().unwrap();
1641 }
1642
1643 {
1645 let provider = factory.database_provider_rw().unwrap();
1646 provider
1647 .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
1648 .unwrap();
1649 provider.commit().unwrap();
1650 }
1651
1652 let provider = factory.database_provider_ro().unwrap();
1653
1654 let sf_tip = provider
1656 .static_file_provider()
1657 .get_highest_static_file_block(StaticFileSegment::StorageChangeSets)
1658 .unwrap();
1659 assert_eq!(sf_tip, 100, "Static file tip should be 100");
1660
1661 let result = rocksdb.check_consistency(&provider).unwrap();
1663 assert_eq!(result, None, "sf_tip == checkpoint should not require unwind");
1664
1665 let entries_after: Vec<_> =
1667 rocksdb.iter::<tables::StoragesHistory>().unwrap().map(|r| r.unwrap()).collect();
1668
1669 assert_eq!(
1670 entries_after.len(),
1671 entries_before.len(),
1672 "RocksDB entry count should be unchanged when sf_tip == checkpoint"
1673 );
1674
1675 for (before, after) in entries_before.iter().zip(entries_after.iter()) {
1677 assert_eq!(before.0, after.0, "Entry key should be unchanged");
1678 assert_eq!(before.1, after.1, "Entry block list should be unchanged");
1679 }
1680 }
1681}