1use super::RocksDBProvider;
8use crate::StaticFileProviderFactory;
9use alloy_eips::eip2718::Encodable2718;
10use alloy_primitives::BlockNumber;
11use rayon::prelude::*;
12use reth_db_api::tables;
13use reth_stages_types::StageId;
14use reth_static_file_types::StaticFileSegment;
15use reth_storage_api::{
16 BlockBodyIndicesProvider, ChangeSetReader, DBProvider, StageCheckpointReader,
17 StorageChangeSetReader, StorageSettingsCache, TransactionsProvider,
18};
19use reth_storage_errors::provider::ProviderResult;
20use std::collections::HashSet;
21
22const HEAL_HISTORY_BATCH_SIZE: u64 = 10_000;
25
26impl RocksDBProvider {
27 pub fn check_consistency<Provider>(
49 &self,
50 provider: &Provider,
51 ) -> ProviderResult<Option<BlockNumber>>
52 where
53 Provider: DBProvider
54 + StageCheckpointReader
55 + StorageSettingsCache
56 + StaticFileProviderFactory
57 + BlockBodyIndicesProvider
58 + StorageChangeSetReader
59 + ChangeSetReader
60 + TransactionsProvider<Transaction: Encodable2718>,
61 {
62 let mut unwind_target: Option<BlockNumber> = None;
63
64 if provider.cached_storage_settings().storage_v2 &&
66 let Some(target) = self.heal_transaction_hash_numbers(provider)?
67 {
68 unwind_target = Some(unwind_target.map_or(target, |t| t.min(target)));
69 }
70
71 if provider.cached_storage_settings().storage_v2 &&
73 let Some(target) = self.heal_storages_history(provider)?
74 {
75 unwind_target = Some(unwind_target.map_or(target, |t| t.min(target)));
76 }
77
78 if provider.cached_storage_settings().storage_v2 &&
80 let Some(target) = self.heal_accounts_history(provider)?
81 {
82 unwind_target = Some(unwind_target.map_or(target, |t| t.min(target)));
83 }
84
85 Ok(unwind_target)
86 }
87
88 fn heal_transaction_hash_numbers<Provider>(
95 &self,
96 provider: &Provider,
97 ) -> ProviderResult<Option<BlockNumber>>
98 where
99 Provider: DBProvider
100 + StageCheckpointReader
101 + StaticFileProviderFactory
102 + BlockBodyIndicesProvider
103 + TransactionsProvider<Transaction: Encodable2718>,
104 {
105 let checkpoint = provider
106 .get_stage_checkpoint(StageId::TransactionLookup)?
107 .map(|cp| cp.block_number)
108 .unwrap_or(0);
109
110 let sf_tip = provider
111 .static_file_provider()
112 .get_highest_static_file_block(StaticFileSegment::Transactions)
113 .unwrap_or(0);
114
115 if checkpoint == 0 && self.first::<tables::TransactionHashNumbers>()?.is_some() {
117 tracing::info!(
118 target: "reth::providers::rocksdb",
119 "TransactionHashNumbers has data but checkpoint is 0, clearing all"
120 );
121 self.clear::<tables::TransactionHashNumbers>()?;
122 return Ok(None);
123 }
124
125 if sf_tip < checkpoint {
126 tracing::warn!(
130 target: "reth::providers::rocksdb",
131 sf_tip,
132 checkpoint,
133 "TransactionHashNumbers: static file tip behind checkpoint, unwind needed"
134 );
135 return Ok(Some(sf_tip));
136 }
137
138 if sf_tip == checkpoint {
140 return Ok(None);
141 }
142
143 let sf_tip_end_tx = provider
145 .static_file_provider()
146 .get_highest_static_file_tx(StaticFileSegment::Transactions)
147 .unwrap_or(0);
148
149 let checkpoint_next_tx = provider
151 .block_body_indices(checkpoint)?
152 .map(|indices| indices.next_tx_num())
153 .unwrap_or(0);
154
155 if sf_tip_end_tx < checkpoint_next_tx {
156 tracing::warn!(
160 target: "reth::providers::rocksdb",
161 sf_tip_end_tx,
162 checkpoint_next_tx,
163 checkpoint,
164 sf_tip,
165 "TransactionHashNumbers: static file tx tip behind checkpoint, unwind needed"
166 );
167 return Ok(Some(sf_tip));
168 }
169
170 tracing::info!(
171 target: "reth::providers::rocksdb",
172 checkpoint,
173 sf_tip,
174 checkpoint_next_tx,
175 sf_tip_end_tx,
176 "TransactionHashNumbers: healing via transaction ranges"
177 );
178
179 const BATCH_SIZE: u64 = 10_000;
180 let mut batch_start = checkpoint_next_tx;
181
182 while batch_start <= sf_tip_end_tx {
183 let batch_end = batch_start.saturating_add(BATCH_SIZE - 1).min(sf_tip_end_tx);
184
185 tracing::debug!(
186 target: "reth::providers::rocksdb",
187 batch_start,
188 batch_end,
189 "Pruning TransactionHashNumbers batch"
190 );
191
192 self.prune_transaction_hash_numbers_in_range(provider, batch_start..=batch_end)?;
193
194 batch_start = batch_end.saturating_add(1);
195 }
196
197 Ok(None)
198 }
199
200 fn prune_transaction_hash_numbers_in_range<Provider>(
213 &self,
214 provider: &Provider,
215 tx_range: std::ops::RangeInclusive<u64>,
216 ) -> ProviderResult<()>
217 where
218 Provider: TransactionsProvider<Transaction: Encodable2718>,
219 {
220 if tx_range.is_empty() {
221 return Ok(());
222 }
223
224 let hashes: Vec<_> = provider
226 .transactions_by_tx_range(tx_range.clone())?
227 .into_par_iter()
228 .map(|tx| tx.trie_hash())
229 .collect();
230
231 if !hashes.is_empty() {
232 tracing::info!(
233 target: "reth::providers::rocksdb",
234 deleted_count = hashes.len(),
235 tx_range_start = *tx_range.start(),
236 tx_range_end = *tx_range.end(),
237 "Pruning TransactionHashNumbers entries by tx range"
238 );
239
240 let mut batch = self.batch();
241 for hash in hashes {
242 batch.delete::<tables::TransactionHashNumbers>(hash)?;
243 }
244 batch.commit()?;
245 }
246
247 Ok(())
248 }
249
250 fn heal_storages_history<Provider>(
255 &self,
256 provider: &Provider,
257 ) -> ProviderResult<Option<BlockNumber>>
258 where
259 Provider:
260 DBProvider + StageCheckpointReader + StaticFileProviderFactory + StorageChangeSetReader,
261 {
262 let checkpoint = provider
263 .get_stage_checkpoint(StageId::IndexStorageHistory)?
264 .map(|cp| cp.block_number)
265 .unwrap_or(0);
266
267 if checkpoint == 0 && self.first::<tables::StoragesHistory>()?.is_some() {
269 tracing::info!(
270 target: "reth::providers::rocksdb",
271 "StoragesHistory has data but checkpoint is 0, clearing all"
272 );
273 self.clear::<tables::StoragesHistory>()?;
274 return Ok(None);
275 }
276
277 let sf_tip = provider
278 .static_file_provider()
279 .get_highest_static_file_block(StaticFileSegment::StorageChangeSets)
280 .unwrap_or(0);
281
282 if sf_tip < checkpoint {
283 tracing::warn!(
287 target: "reth::providers::rocksdb",
288 sf_tip,
289 checkpoint,
290 "StoragesHistory: static file tip behind checkpoint, unwind needed"
291 );
292 return Ok(Some(sf_tip));
293 }
294
295 if sf_tip == checkpoint {
296 return Ok(None);
297 }
298
299 let total_blocks = sf_tip - checkpoint;
300 tracing::info!(
301 target: "reth::providers::rocksdb",
302 checkpoint,
303 sf_tip,
304 total_blocks,
305 "StoragesHistory: healing via changesets"
306 );
307
308 let mut batch_start = checkpoint + 1;
309 let mut batch_num = 0u64;
310 let total_batches = total_blocks.div_ceil(HEAL_HISTORY_BATCH_SIZE);
311
312 while batch_start <= sf_tip {
313 let batch_end = (batch_start + HEAL_HISTORY_BATCH_SIZE - 1).min(sf_tip);
314 batch_num += 1;
315
316 let changesets = provider.storage_changesets_range(batch_start..=batch_end)?;
317
318 let unique_keys: HashSet<_> = changesets
319 .into_iter()
320 .map(|(block_addr, entry)| {
321 (block_addr.address(), entry.key.as_b256(), checkpoint + 1)
323 })
324 .collect();
325 let indices: Vec<_> = unique_keys.into_iter().collect();
326
327 if !indices.is_empty() {
328 tracing::info!(
329 target: "reth::providers::rocksdb",
330 batch_num,
331 total_batches,
332 batch_start,
333 batch_end,
334 indices_count = indices.len(),
335 "StoragesHistory: unwinding batch"
336 );
337
338 let batch = self.unwind_storage_history_indices(&indices)?;
339 self.commit_batch(batch)?;
340 }
341
342 batch_start = batch_end + 1;
343 }
344
345 Ok(None)
346 }
347
348 fn heal_accounts_history<Provider>(
353 &self,
354 provider: &Provider,
355 ) -> ProviderResult<Option<BlockNumber>>
356 where
357 Provider: DBProvider + StageCheckpointReader + StaticFileProviderFactory + ChangeSetReader,
358 {
359 let checkpoint = provider
360 .get_stage_checkpoint(StageId::IndexAccountHistory)?
361 .map(|cp| cp.block_number)
362 .unwrap_or(0);
363
364 if checkpoint == 0 && self.first::<tables::AccountsHistory>()?.is_some() {
366 tracing::info!(
367 target: "reth::providers::rocksdb",
368 "AccountsHistory has data but checkpoint is 0, clearing all"
369 );
370 self.clear::<tables::AccountsHistory>()?;
371 return Ok(None);
372 }
373
374 let sf_tip = provider
375 .static_file_provider()
376 .get_highest_static_file_block(StaticFileSegment::AccountChangeSets)
377 .unwrap_or(0);
378
379 if sf_tip < checkpoint {
380 tracing::warn!(
384 target: "reth::providers::rocksdb",
385 sf_tip,
386 checkpoint,
387 "AccountsHistory: static file tip behind checkpoint, unwind needed"
388 );
389 return Ok(Some(sf_tip));
390 }
391
392 if sf_tip == checkpoint {
393 return Ok(None);
394 }
395
396 let total_blocks = sf_tip - checkpoint;
397 tracing::info!(
398 target: "reth::providers::rocksdb",
399 checkpoint,
400 sf_tip,
401 total_blocks,
402 "AccountsHistory: healing via changesets"
403 );
404
405 let mut batch_start = checkpoint + 1;
406 let mut batch_num = 0u64;
407 let total_batches = total_blocks.div_ceil(HEAL_HISTORY_BATCH_SIZE);
408
409 while batch_start <= sf_tip {
410 let batch_end = (batch_start + HEAL_HISTORY_BATCH_SIZE - 1).min(sf_tip);
411 batch_num += 1;
412
413 let changesets = provider.account_changesets_range(batch_start..=batch_end)?;
414
415 let mut addresses = HashSet::with_capacity(changesets.len());
416 addresses.extend(changesets.iter().map(|(_, cs)| cs.address));
417 let unwind_from = checkpoint + 1;
418 let indices: Vec<_> = addresses.into_iter().map(|addr| (addr, unwind_from)).collect();
419
420 if !indices.is_empty() {
421 tracing::info!(
422 target: "reth::providers::rocksdb",
423 batch_num,
424 total_batches,
425 batch_start,
426 batch_end,
427 indices_count = indices.len(),
428 "AccountsHistory: unwinding batch"
429 );
430
431 let batch = self.unwind_account_history_indices(&indices)?;
432 self.commit_batch(batch)?;
433 }
434
435 batch_start = batch_end + 1;
436 }
437
438 Ok(None)
439 }
440}
441
442#[cfg(test)]
443mod tests {
444 use super::*;
445 use crate::{
446 providers::{rocksdb::RocksDBBuilder, static_file::StaticFileWriter},
447 test_utils::create_test_provider_factory,
448 BlockWriter, DatabaseProviderFactory, StageCheckpointWriter, TransactionsProvider,
449 };
450 use alloy_primitives::{Address, B256};
451 use reth_db::cursor::{DbCursorRO, DbCursorRW};
452 use reth_db_api::{
453 models::{storage_sharded_key::StorageShardedKey, StorageSettings},
454 tables::{self, BlockNumberList},
455 transaction::DbTxMut,
456 };
457 use reth_stages_types::StageCheckpoint;
458 use reth_testing_utils::generators::{self, BlockRangeParams};
459 use tempfile::TempDir;
460
461 #[test]
462 fn test_first_last_empty_rocksdb() {
463 let temp_dir = TempDir::new().unwrap();
464 let provider = RocksDBBuilder::new(temp_dir.path())
465 .with_table::<tables::TransactionHashNumbers>()
466 .with_table::<tables::StoragesHistory>()
467 .build()
468 .unwrap();
469
470 let first = provider.first::<tables::TransactionHashNumbers>().unwrap();
472 let last = provider.last::<tables::TransactionHashNumbers>().unwrap();
473
474 assert!(first.is_none());
475 assert!(last.is_none());
476 }
477
478 #[test]
479 fn test_first_last_with_data() {
480 let temp_dir = TempDir::new().unwrap();
481 let provider = RocksDBBuilder::new(temp_dir.path())
482 .with_table::<tables::TransactionHashNumbers>()
483 .build()
484 .unwrap();
485
486 let tx_hash = B256::from([1u8; 32]);
488 provider.put::<tables::TransactionHashNumbers>(tx_hash, &100).unwrap();
489
490 let last = provider.last::<tables::TransactionHashNumbers>().unwrap();
492 assert!(last.is_some());
493 assert_eq!(last.unwrap().1, 100);
494 }
495
496 #[test]
497 fn test_check_consistency_empty_rocksdb_no_checkpoint_is_ok() {
498 let temp_dir = TempDir::new().unwrap();
499 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
500
501 let factory = create_test_provider_factory();
503 factory.set_storage_settings_cache(StorageSettings::v2());
504
505 let provider = factory.database_provider_ro().unwrap();
506
507 let result = rocksdb.check_consistency(&provider).unwrap();
509 assert_eq!(result, None);
510 }
511
512 #[test]
513 fn test_check_consistency_empty_rocksdb_with_checkpoint_is_first_run() {
514 let temp_dir = TempDir::new().unwrap();
515 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
516
517 let factory = create_test_provider_factory();
519 factory.set_storage_settings_cache(StorageSettings::v2());
520
521 {
523 let provider = factory.database_provider_rw().unwrap();
524 provider
525 .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(100))
526 .unwrap();
527 provider.commit().unwrap();
528 }
529
530 let provider = factory.database_provider_ro().unwrap();
531
532 let result = rocksdb.check_consistency(&provider).unwrap();
535 assert_eq!(result, Some(0), "Static file tip (0) behind checkpoint (100) triggers unwind");
536 }
537
538 #[test]
541 fn test_check_consistency_checkpoint_zero_with_rocksdb_data_prunes_all() {
542 let temp_dir = TempDir::new().unwrap();
543 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
544
545 let factory = create_test_provider_factory();
546 factory.set_storage_settings_cache(StorageSettings::v2());
547
548 let mut rng = generators::rng();
550 let blocks = generators::random_block_range(
551 &mut rng,
552 0..=2,
553 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
554 );
555
556 let mut tx_hashes = Vec::new();
557 {
558 let provider = factory.database_provider_rw().unwrap();
559 let mut tx_count = 0u64;
560 for block in &blocks {
561 provider
562 .insert_block(&block.clone().try_recover().expect("recover block"))
563 .unwrap();
564 for tx in &block.body().transactions {
565 let hash = tx.trie_hash();
566 tx_hashes.push(hash);
567 rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
568 tx_count += 1;
569 }
570 }
571 provider.commit().unwrap();
572 }
573
574 {
576 let provider = factory.database_provider_rw().unwrap();
577 provider
578 .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(0))
579 .unwrap();
580 provider
581 .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(0))
582 .unwrap();
583 provider
584 .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(0))
585 .unwrap();
586 provider.commit().unwrap();
587 }
588
589 assert!(rocksdb.last::<tables::TransactionHashNumbers>().unwrap().is_some());
591
592 let provider = factory.database_provider_ro().unwrap();
593
594 let result = rocksdb.check_consistency(&provider).unwrap();
597 assert_eq!(result, None, "Should heal by clearing, no unwind needed");
598
599 for hash in &tx_hashes {
601 assert!(
602 rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap().is_none(),
603 "RocksDB should be empty after pruning"
604 );
605 }
606 }
607
608 #[test]
609 fn test_check_consistency_storages_history_empty_with_checkpoint_is_first_run() {
610 let temp_dir = TempDir::new().unwrap();
611 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
612
613 let factory = create_test_provider_factory();
615 factory.set_storage_settings_cache(StorageSettings::v2());
616
617 {
619 let provider = factory.database_provider_rw().unwrap();
620 provider
621 .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
622 .unwrap();
623 provider.commit().unwrap();
624 }
625
626 let provider = factory.database_provider_ro().unwrap();
627
628 let result = rocksdb.check_consistency(&provider).unwrap();
632 assert_eq!(result, Some(0), "sf_tip=0 < checkpoint=100 returns unwind target");
633 }
634
635 #[test]
636 fn test_check_consistency_storages_history_has_data_no_checkpoint_prunes_data() {
637 let temp_dir = TempDir::new().unwrap();
638 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
639
640 let key = StorageShardedKey::new(Address::ZERO, B256::ZERO, 50);
642 let block_list = BlockNumberList::new_pre_sorted([10, 20, 30, 50]);
643 rocksdb.put::<tables::StoragesHistory>(key, &block_list).unwrap();
644
645 assert!(rocksdb.last::<tables::StoragesHistory>().unwrap().is_some());
647
648 let factory = create_test_provider_factory();
650 factory.set_storage_settings_cache(StorageSettings::v2());
651
652 let provider = factory.database_provider_ro().unwrap();
653
654 let result = rocksdb.check_consistency(&provider).unwrap();
657 assert_eq!(result, None, "Should heal by pruning, no unwind needed");
658
659 assert!(
661 rocksdb.last::<tables::StoragesHistory>().unwrap().is_none(),
662 "RocksDB should be empty after pruning"
663 );
664 }
665 #[test]
666 fn test_check_consistency_mdbx_behind_checkpoint_needs_unwind() {
667 let temp_dir = TempDir::new().unwrap();
668 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
669
670 let factory = create_test_provider_factory();
671 factory.set_storage_settings_cache(StorageSettings::v2());
672
673 let mut rng = generators::rng();
675 let blocks = generators::random_block_range(
676 &mut rng,
677 0..=2,
678 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
679 );
680
681 {
682 let provider = factory.database_provider_rw().unwrap();
683 let mut tx_count = 0u64;
684 for block in &blocks {
685 provider
686 .insert_block(&block.clone().try_recover().expect("recover block"))
687 .unwrap();
688 for tx in &block.body().transactions {
689 let hash = tx.trie_hash();
690 rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
691 tx_count += 1;
692 }
693 }
694 provider.commit().unwrap();
695 }
696
697 {
701 let provider = factory.database_provider_rw().unwrap();
702 provider
703 .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(10))
704 .unwrap();
705 provider
707 .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(0))
708 .unwrap();
709 provider
710 .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(0))
711 .unwrap();
712 provider.commit().unwrap();
713 }
714
715 let provider = factory.database_provider_ro().unwrap();
716
717 let result = rocksdb.check_consistency(&provider).unwrap();
719 assert_eq!(result, Some(2), "sf_tip < checkpoint requires unwind to sf_tip");
720 }
721
722 #[test]
723 fn test_check_consistency_rocksdb_ahead_of_checkpoint_prunes_excess() {
724 let temp_dir = TempDir::new().unwrap();
725 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
726
727 let factory = create_test_provider_factory();
729 factory.set_storage_settings_cache(StorageSettings::v2());
730
731 let mut rng = generators::rng();
734 let blocks = generators::random_block_range(
735 &mut rng,
736 0..=5,
737 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
738 );
739
740 let mut tx_hashes = Vec::new();
742 let mut tx_count = 0u64;
743 {
744 let provider = factory.database_provider_rw().unwrap();
745 for block in &blocks {
747 provider
748 .insert_block(&block.clone().try_recover().expect("recover block"))
749 .unwrap();
750 for tx in &block.body().transactions {
751 let hash = tx.trie_hash();
752 tx_hashes.push(hash);
753 rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
754 tx_count += 1;
755 }
756 }
757 provider.commit().unwrap();
758 }
759
760 {
764 let provider = factory.database_provider_rw().unwrap();
765 let mut cursor = provider.tx_ref().cursor_write::<tables::TransactionBlocks>().unwrap();
769 let mut to_delete = Vec::new();
771 let mut walker = cursor.walk(Some(0)).unwrap();
772 while let Some((tx_num, block_num)) = walker.next().transpose().unwrap() {
773 if block_num > 2 {
774 to_delete.push(tx_num);
775 }
776 }
777 drop(walker);
778 for tx_num in to_delete {
779 cursor.seek_exact(tx_num).unwrap();
780 cursor.delete_current().unwrap();
781 }
782
783 provider
785 .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(2))
786 .unwrap();
787 provider
789 .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(0))
790 .unwrap();
791 provider
792 .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(0))
793 .unwrap();
794 provider.commit().unwrap();
795 }
796
797 let provider = factory.database_provider_ro().unwrap();
798
799 let result = rocksdb.check_consistency(&provider).unwrap();
804 assert_eq!(result, None, "Should heal by pruning, no unwind needed");
805
806 for (i, hash) in tx_hashes.iter().take(6).enumerate() {
809 assert!(
810 rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap().is_some(),
811 "tx {} should remain",
812 i
813 );
814 }
815 for (i, hash) in tx_hashes.iter().skip(6).enumerate() {
817 assert!(
818 rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap().is_none(),
819 "tx {} should be pruned",
820 i + 6
821 );
822 }
823 }
824
825 #[test]
826 fn test_check_consistency_storages_history_sentinel_only_with_checkpoint_is_first_run() {
827 let temp_dir = TempDir::new().unwrap();
828 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
829
830 let key_sentinel_1 = StorageShardedKey::new(Address::ZERO, B256::ZERO, u64::MAX);
833 let key_sentinel_2 = StorageShardedKey::new(Address::random(), B256::random(), u64::MAX);
834 let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
835 rocksdb.put::<tables::StoragesHistory>(key_sentinel_1, &block_list).unwrap();
836 rocksdb.put::<tables::StoragesHistory>(key_sentinel_2, &block_list).unwrap();
837
838 assert!(rocksdb.first::<tables::StoragesHistory>().unwrap().is_some());
840
841 let factory = create_test_provider_factory();
843 factory.set_storage_settings_cache(StorageSettings::v2());
844
845 {
847 let provider = factory.database_provider_rw().unwrap();
848 provider
849 .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
850 .unwrap();
851 provider.commit().unwrap();
852 }
853
854 let provider = factory.database_provider_ro().unwrap();
855
856 let result = rocksdb.check_consistency(&provider).unwrap();
860 assert_eq!(result, Some(0), "sf_tip=0 < checkpoint=100 returns unwind target");
861 }
862
863 #[test]
864 fn test_check_consistency_accounts_history_sentinel_only_with_checkpoint_is_first_run() {
865 use reth_db_api::models::ShardedKey;
866
867 let temp_dir = TempDir::new().unwrap();
868 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
869
870 let key_sentinel_1 = ShardedKey::new(Address::ZERO, u64::MAX);
872 let key_sentinel_2 = ShardedKey::new(Address::random(), u64::MAX);
873 let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
874 rocksdb.put::<tables::AccountsHistory>(key_sentinel_1, &block_list).unwrap();
875 rocksdb.put::<tables::AccountsHistory>(key_sentinel_2, &block_list).unwrap();
876
877 assert!(rocksdb.first::<tables::AccountsHistory>().unwrap().is_some());
879
880 let factory = create_test_provider_factory();
882 factory.set_storage_settings_cache(StorageSettings::v2());
883
884 {
886 let provider = factory.database_provider_rw().unwrap();
887 provider
888 .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
889 .unwrap();
890 provider.commit().unwrap();
891 }
892
893 let provider = factory.database_provider_ro().unwrap();
894
895 let result = rocksdb.check_consistency(&provider).unwrap();
899 assert_eq!(result, Some(0), "sf_tip=0 < checkpoint=100 returns unwind target");
900 }
901
902 #[test]
906 fn test_prune_transaction_hash_numbers_by_range() {
907 let temp_dir = TempDir::new().unwrap();
908 let rocksdb = RocksDBBuilder::new(temp_dir.path())
909 .with_table::<tables::TransactionHashNumbers>()
910 .build()
911 .unwrap();
912
913 let factory = create_test_provider_factory();
915 factory.set_storage_settings_cache(StorageSettings::v2());
916
917 let mut rng = generators::rng();
921 let blocks = generators::random_block_range(
922 &mut rng,
923 0..=5,
924 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
925 );
926
927 let mut tx_count = 0u64;
929 let mut tx_hashes = Vec::new();
930 {
931 let provider = factory.database_provider_rw().unwrap();
932
933 for block in &blocks {
934 provider
935 .insert_block(&block.clone().try_recover().expect("recover block"))
936 .unwrap();
937
938 for tx in &block.body().transactions {
940 let hash = tx.trie_hash();
941 tx_hashes.push(hash);
942 rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
943 tx_count += 1;
944 }
945 }
946
947 provider
950 .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(2))
951 .unwrap();
952 provider.commit().unwrap();
953 }
954
955 let rocksdb_count_before: usize =
962 rocksdb.iter::<tables::TransactionHashNumbers>().unwrap().count();
963 assert_eq!(
964 rocksdb_count_before, tx_count as usize,
965 "RocksDB should have all {} transaction hashes before pruning",
966 tx_count
967 );
968
969 let provider = factory.database_provider_ro().unwrap();
970
971 let all_txs = provider.transactions_by_tx_range(0..tx_count).unwrap();
973 assert_eq!(all_txs.len(), tx_count as usize, "Should be able to fetch all transactions");
974
975 for (i, tx) in all_txs.iter().enumerate() {
977 let computed_hash = tx.trie_hash();
978 assert_eq!(
979 computed_hash, tx_hashes[i],
980 "Hash mismatch for tx {}: stored {:?} vs computed {:?}",
981 i, tx_hashes[i], computed_hash
982 );
983 }
984
985 let max_tx_to_keep = 5u64;
988 let tx_to_prune_start = max_tx_to_keep + 1;
989
990 rocksdb
992 .prune_transaction_hash_numbers_in_range(&provider, tx_to_prune_start..=(tx_count - 1))
993 .expect("prune should succeed");
994
995 let mut remaining_count = 0;
997 for result in rocksdb.iter::<tables::TransactionHashNumbers>().unwrap() {
998 let (_hash, tx_num) = result.unwrap();
999 assert!(
1000 tx_num <= max_tx_to_keep,
1001 "Transaction {} should have been pruned (> {})",
1002 tx_num,
1003 max_tx_to_keep
1004 );
1005 remaining_count += 1;
1006 }
1007 assert_eq!(
1008 remaining_count,
1009 (max_tx_to_keep + 1) as usize,
1010 "Should have {} transactions (0-{})",
1011 max_tx_to_keep + 1,
1012 max_tx_to_keep
1013 );
1014 }
1015
1016 #[test]
1017 fn test_check_consistency_accounts_history_empty_with_checkpoint_is_first_run() {
1018 let temp_dir = TempDir::new().unwrap();
1019 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1020
1021 let factory = create_test_provider_factory();
1023 factory.set_storage_settings_cache(StorageSettings::v2());
1024
1025 {
1027 let provider = factory.database_provider_rw().unwrap();
1028 provider
1029 .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
1030 .unwrap();
1031 provider.commit().unwrap();
1032 }
1033
1034 let provider = factory.database_provider_ro().unwrap();
1035
1036 let result = rocksdb.check_consistency(&provider).unwrap();
1040 assert_eq!(result, Some(0), "sf_tip=0 < checkpoint=100 returns unwind target");
1041 }
1042
1043 #[test]
1044 fn test_check_consistency_accounts_history_has_data_no_checkpoint_prunes_data() {
1045 use reth_db_api::models::ShardedKey;
1046
1047 let temp_dir = TempDir::new().unwrap();
1048 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1049
1050 let key = ShardedKey::new(Address::ZERO, 50);
1052 let block_list = BlockNumberList::new_pre_sorted([10, 20, 30, 50]);
1053 rocksdb.put::<tables::AccountsHistory>(key, &block_list).unwrap();
1054
1055 assert!(rocksdb.last::<tables::AccountsHistory>().unwrap().is_some());
1057
1058 let factory = create_test_provider_factory();
1060 factory.set_storage_settings_cache(StorageSettings::v2());
1061
1062 let provider = factory.database_provider_ro().unwrap();
1063
1064 let result = rocksdb.check_consistency(&provider).unwrap();
1067 assert_eq!(result, None, "Should heal by pruning, no unwind needed");
1068
1069 assert!(
1071 rocksdb.last::<tables::AccountsHistory>().unwrap().is_none(),
1072 "RocksDB should be empty after pruning"
1073 );
1074 }
1075
1076 #[test]
1077 fn test_check_consistency_accounts_history_sf_tip_equals_checkpoint_no_action() {
1078 use reth_db::models::AccountBeforeTx;
1079 use reth_db_api::models::ShardedKey;
1080 use reth_static_file_types::StaticFileSegment;
1081
1082 let temp_dir = TempDir::new().unwrap();
1083 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1084
1085 let key1 = ShardedKey::new(Address::ZERO, 50);
1087 let key2 = ShardedKey::new(Address::random(), 75);
1088 let key3 = ShardedKey::new(Address::random(), u64::MAX); let block_list1 = BlockNumberList::new_pre_sorted([10, 20, 30, 50]);
1090 let block_list2 = BlockNumberList::new_pre_sorted([40, 60, 75]);
1091 let block_list3 = BlockNumberList::new_pre_sorted([80, 90, 100]);
1092 rocksdb.put::<tables::AccountsHistory>(key1, &block_list1).unwrap();
1093 rocksdb.put::<tables::AccountsHistory>(key2, &block_list2).unwrap();
1094 rocksdb.put::<tables::AccountsHistory>(key3, &block_list3).unwrap();
1095
1096 let entries_before: Vec<_> =
1098 rocksdb.iter::<tables::AccountsHistory>().unwrap().map(|r| r.unwrap()).collect();
1099 assert_eq!(entries_before.len(), 3, "Should have 3 entries before check");
1100
1101 let factory = create_test_provider_factory();
1103 factory.set_storage_settings_cache(StorageSettings::v2());
1104
1105 {
1107 let sf_provider = factory.static_file_provider();
1108 let mut writer =
1109 sf_provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
1110
1111 for block_num in 0..=100 {
1112 let changeset = vec![AccountBeforeTx { address: Address::random(), info: None }];
1113 writer.append_account_changeset(changeset, block_num).unwrap();
1114 }
1115
1116 writer.commit().unwrap();
1117 }
1118
1119 {
1121 let provider = factory.database_provider_rw().unwrap();
1122 provider
1123 .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
1124 .unwrap();
1125 provider.commit().unwrap();
1126 }
1127
1128 let provider = factory.database_provider_ro().unwrap();
1129
1130 let sf_tip = provider
1132 .static_file_provider()
1133 .get_highest_static_file_block(StaticFileSegment::AccountChangeSets)
1134 .unwrap();
1135 assert_eq!(sf_tip, 100, "Static file tip should be 100");
1136
1137 let result = rocksdb.check_consistency(&provider).unwrap();
1139 assert_eq!(result, None, "sf_tip == checkpoint should not require unwind");
1140
1141 let entries_after: Vec<_> =
1143 rocksdb.iter::<tables::AccountsHistory>().unwrap().map(|r| r.unwrap()).collect();
1144
1145 assert_eq!(
1146 entries_after.len(),
1147 entries_before.len(),
1148 "RocksDB entry count should be unchanged when sf_tip == checkpoint"
1149 );
1150
1151 for (before, after) in entries_before.iter().zip(entries_after.iter()) {
1153 assert_eq!(before.0.key, after.0.key, "Entry key should be unchanged");
1154 assert_eq!(
1155 before.0.highest_block_number, after.0.highest_block_number,
1156 "Entry highest_block_number should be unchanged"
1157 );
1158 assert_eq!(before.1, after.1, "Entry block list should be unchanged");
1159 }
1160 }
1161
1162 #[test]
1174 fn test_check_consistency_storages_history_heals_via_changesets_large_range() {
1175 use alloy_primitives::U256;
1176 use reth_db_api::models::StorageBeforeTx;
1177
1178 const TOTAL_BLOCKS: u64 = 15_000;
1179 const CHECKPOINT_BLOCK: u64 = 5_000;
1180
1181 let temp_dir = TempDir::new().unwrap();
1182 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1183
1184 let factory = create_test_provider_factory();
1185 factory.set_storage_settings_cache(StorageSettings::v2());
1186
1187 #[inline]
1189 fn make_address(block_num: u64) -> Address {
1190 let mut addr_bytes = [0u8; 20];
1191 addr_bytes[0..8].copy_from_slice(&block_num.to_le_bytes());
1192 Address::from(addr_bytes)
1193 }
1194
1195 #[inline]
1197 fn make_slot(block_num: u64) -> B256 {
1198 let mut slot_bytes = [0u8; 32];
1199 slot_bytes[0..8].copy_from_slice(&block_num.to_le_bytes());
1200 B256::from(slot_bytes)
1201 }
1202
1203 {
1206 let sf_provider = factory.static_file_provider();
1207 let mut writer =
1208 sf_provider.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
1209
1210 let mut changeset = Vec::with_capacity(1);
1212
1213 for block_num in 0..TOTAL_BLOCKS {
1214 changeset.clear();
1215 changeset.push(StorageBeforeTx {
1216 address: make_address(block_num),
1217 key: make_slot(block_num),
1218 value: U256::from(block_num),
1219 });
1220
1221 writer.append_storage_changeset(changeset.clone(), block_num).unwrap();
1222 }
1223
1224 writer.commit().unwrap();
1225 }
1226
1227 {
1229 let sf_provider = factory.static_file_provider();
1230 let highest = sf_provider
1231 .get_highest_static_file_block(StaticFileSegment::StorageChangeSets)
1232 .unwrap();
1233 assert_eq!(highest, TOTAL_BLOCKS - 1, "Static files should have blocks 0..14999");
1234 }
1235
1236 {
1238 let provider = factory.database_provider_rw().unwrap();
1239 provider
1240 .save_stage_checkpoint(
1241 StageId::IndexStorageHistory,
1242 StageCheckpoint::new(CHECKPOINT_BLOCK),
1243 )
1244 .unwrap();
1245 provider.commit().unwrap();
1246 }
1247
1248 for block_num in (CHECKPOINT_BLOCK + 1)..TOTAL_BLOCKS {
1251 let key =
1252 StorageShardedKey::new(make_address(block_num), make_slot(block_num), block_num);
1253 let block_list = BlockNumberList::new_pre_sorted([block_num]);
1254 rocksdb.put::<tables::StoragesHistory>(key, &block_list).unwrap();
1255 }
1256
1257 let count_before: usize = rocksdb.iter::<tables::StoragesHistory>().unwrap().count();
1259 assert_eq!(
1260 count_before,
1261 (TOTAL_BLOCKS - CHECKPOINT_BLOCK - 1) as usize,
1262 "Should have {} stale entries before healing",
1263 TOTAL_BLOCKS - CHECKPOINT_BLOCK - 1
1264 );
1265
1266 let provider = factory.database_provider_ro().unwrap();
1268 let result = rocksdb.check_consistency(&provider).unwrap();
1269 assert_eq!(result, None, "Should heal via changesets, no unwind needed");
1270
1271 let mut remaining_stale = 0;
1274 for result in rocksdb.iter::<tables::StoragesHistory>().unwrap() {
1275 let (key, _) = result.unwrap();
1276 if key.sharded_key.highest_block_number > CHECKPOINT_BLOCK {
1277 remaining_stale += 1;
1278 }
1279 }
1280 assert_eq!(
1281 remaining_stale, 0,
1282 "All stale entries (block > {}) should be pruned",
1283 CHECKPOINT_BLOCK
1284 );
1285 }
1286
1287 #[test]
1291 fn test_check_consistency_storages_history_preserves_checkpoint_block() {
1292 use alloy_primitives::U256;
1293 use reth_db_api::models::StorageBeforeTx;
1294
1295 const CHECKPOINT_BLOCK: u64 = 100;
1296 const SF_TIP: u64 = 200;
1297
1298 let temp_dir = TempDir::new().unwrap();
1299 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1300
1301 let factory = create_test_provider_factory();
1302 factory.set_storage_settings_cache(StorageSettings::v2());
1303
1304 let checkpoint_addr = Address::repeat_byte(0xAA);
1305 let checkpoint_slot = B256::repeat_byte(0xBB);
1306 let stale_addr = Address::repeat_byte(0xCC);
1307 let stale_slot = B256::repeat_byte(0xDD);
1308
1309 {
1311 let sf_provider = factory.static_file_provider();
1312 let mut writer =
1313 sf_provider.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
1314
1315 for block_num in 0..=SF_TIP {
1316 let changeset = if block_num == CHECKPOINT_BLOCK {
1317 vec![StorageBeforeTx {
1318 address: checkpoint_addr,
1319 key: checkpoint_slot,
1320 value: U256::from(block_num),
1321 }]
1322 } else if block_num > CHECKPOINT_BLOCK {
1323 vec![StorageBeforeTx {
1324 address: stale_addr,
1325 key: stale_slot,
1326 value: U256::from(block_num),
1327 }]
1328 } else {
1329 vec![StorageBeforeTx {
1330 address: Address::ZERO,
1331 key: B256::ZERO,
1332 value: U256::ZERO,
1333 }]
1334 };
1335 writer.append_storage_changeset(changeset, block_num).unwrap();
1336 }
1337 writer.commit().unwrap();
1338 }
1339
1340 {
1342 let provider = factory.database_provider_rw().unwrap();
1343 provider
1344 .save_stage_checkpoint(
1345 StageId::IndexStorageHistory,
1346 StageCheckpoint::new(CHECKPOINT_BLOCK),
1347 )
1348 .unwrap();
1349 provider.commit().unwrap();
1350 }
1351
1352 let checkpoint_key =
1354 StorageShardedKey::new(checkpoint_addr, checkpoint_slot, CHECKPOINT_BLOCK);
1355 let checkpoint_list = BlockNumberList::new_pre_sorted([CHECKPOINT_BLOCK]);
1356 rocksdb.put::<tables::StoragesHistory>(checkpoint_key.clone(), &checkpoint_list).unwrap();
1357
1358 let stale_key = StorageShardedKey::new(stale_addr, stale_slot, SF_TIP);
1360 let stale_list = BlockNumberList::new_pre_sorted([CHECKPOINT_BLOCK + 1, SF_TIP]);
1361 rocksdb.put::<tables::StoragesHistory>(stale_key.clone(), &stale_list).unwrap();
1362
1363 let provider = factory.database_provider_ro().unwrap();
1365 let result = rocksdb.check_consistency(&provider).unwrap();
1366 assert_eq!(result, None, "Should heal without unwind");
1367
1368 let preserved = rocksdb.get::<tables::StoragesHistory>(checkpoint_key).unwrap();
1370 assert!(preserved.is_some(), "Entry at checkpoint block should be preserved, not deleted");
1371
1372 let stale = rocksdb.get::<tables::StoragesHistory>(stale_key).unwrap();
1374 assert!(stale.is_none(), "Stale entry after checkpoint should be removed");
1375 }
1376
1377 #[test]
1391 fn test_check_consistency_accounts_history_heals_via_changesets_large_range() {
1392 use reth_db::models::AccountBeforeTx;
1393 use reth_db_api::models::ShardedKey;
1394 use reth_static_file_types::StaticFileSegment;
1395
1396 let temp_dir = TempDir::new().unwrap();
1397 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1398
1399 let factory = create_test_provider_factory();
1401 factory.set_storage_settings_cache(StorageSettings::v2());
1402
1403 const TOTAL_BLOCKS: u64 = 15_000;
1404 const CHECKPOINT_BLOCK: u64 = 5_000;
1405
1406 #[inline]
1408 fn make_address(block_num: u64) -> Address {
1409 let mut addr = Address::ZERO;
1410 addr.0[0..8].copy_from_slice(&block_num.to_le_bytes());
1411 addr
1412 }
1413
1414 {
1416 let sf_provider = factory.static_file_provider();
1417 let mut writer =
1418 sf_provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
1419
1420 let mut changeset = Vec::with_capacity(1);
1422
1423 for block_num in 0..TOTAL_BLOCKS {
1424 changeset.clear();
1425 changeset.push(AccountBeforeTx { address: make_address(block_num), info: None });
1426 writer.append_account_changeset(changeset.clone(), block_num).unwrap();
1427 }
1428
1429 writer.commit().unwrap();
1430 }
1431
1432 for block_num in (CHECKPOINT_BLOCK + 1)..TOTAL_BLOCKS {
1436 let key = ShardedKey::new(make_address(block_num), block_num);
1437 let block_list = BlockNumberList::new_pre_sorted([block_num]);
1438 rocksdb.put::<tables::AccountsHistory>(key, &block_list).unwrap();
1439 }
1440
1441 for block_num in [100u64, 500, 1000, 2500, 5000] {
1443 let key = ShardedKey::new(make_address(block_num), block_num);
1444 let block_list = BlockNumberList::new_pre_sorted([block_num]);
1445 rocksdb.put::<tables::AccountsHistory>(key, &block_list).unwrap();
1446 }
1447
1448 let entries_before: usize = rocksdb.iter::<tables::AccountsHistory>().unwrap().count();
1450 let stale_count = (TOTAL_BLOCKS - CHECKPOINT_BLOCK - 1) as usize;
1451 let valid_count = 5usize;
1452 assert_eq!(
1453 entries_before,
1454 stale_count + valid_count,
1455 "Should have {} stale + {} valid entries before healing",
1456 stale_count,
1457 valid_count
1458 );
1459
1460 {
1462 let provider = factory.database_provider_rw().unwrap();
1463 provider
1464 .save_stage_checkpoint(
1465 StageId::IndexAccountHistory,
1466 StageCheckpoint::new(CHECKPOINT_BLOCK),
1467 )
1468 .unwrap();
1469 provider.commit().unwrap();
1470 }
1471
1472 let provider = factory.database_provider_ro().unwrap();
1473
1474 let sf_tip = provider
1476 .static_file_provider()
1477 .get_highest_static_file_block(StaticFileSegment::AccountChangeSets)
1478 .unwrap();
1479 assert_eq!(sf_tip, TOTAL_BLOCKS - 1, "Static file tip should be 14999");
1480 assert!(sf_tip > CHECKPOINT_BLOCK, "sf_tip should be > checkpoint to trigger healing");
1481
1482 let result = rocksdb.check_consistency(&provider).unwrap();
1484 assert_eq!(result, None, "Healing should succeed without requiring unwind");
1485
1486 let mut remaining_stale = 0;
1489 for result in rocksdb.iter::<tables::AccountsHistory>().unwrap() {
1490 let (key, _) = result.unwrap();
1491 if key.highest_block_number > CHECKPOINT_BLOCK && key.highest_block_number != u64::MAX {
1492 remaining_stale += 1;
1493 }
1494 }
1495 assert_eq!(
1496 remaining_stale, 0,
1497 "All stale entries (block > {}) should be pruned",
1498 CHECKPOINT_BLOCK
1499 );
1500 }
1501
1502 #[test]
1504 fn test_check_consistency_accounts_history_preserves_checkpoint_block() {
1505 use reth_db::models::AccountBeforeTx;
1506 use reth_db_api::models::ShardedKey;
1507
1508 const CHECKPOINT_BLOCK: u64 = 100;
1509 const SF_TIP: u64 = 200;
1510
1511 let temp_dir = TempDir::new().unwrap();
1512 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1513
1514 let factory = create_test_provider_factory();
1515 factory.set_storage_settings_cache(StorageSettings::v2());
1516
1517 let checkpoint_addr = Address::repeat_byte(0xAA);
1518 let stale_addr = Address::repeat_byte(0xCC);
1519
1520 {
1522 let sf_provider = factory.static_file_provider();
1523 let mut writer =
1524 sf_provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
1525
1526 for block_num in 0..=SF_TIP {
1527 let changeset = if block_num == CHECKPOINT_BLOCK {
1528 vec![AccountBeforeTx { address: checkpoint_addr, info: None }]
1529 } else if block_num > CHECKPOINT_BLOCK {
1530 vec![AccountBeforeTx { address: stale_addr, info: None }]
1531 } else {
1532 vec![AccountBeforeTx { address: Address::ZERO, info: None }]
1533 };
1534 writer.append_account_changeset(changeset, block_num).unwrap();
1535 }
1536 writer.commit().unwrap();
1537 }
1538
1539 {
1541 let provider = factory.database_provider_rw().unwrap();
1542 provider
1543 .save_stage_checkpoint(
1544 StageId::IndexAccountHistory,
1545 StageCheckpoint::new(CHECKPOINT_BLOCK),
1546 )
1547 .unwrap();
1548 provider.commit().unwrap();
1549 }
1550
1551 let checkpoint_key = ShardedKey::new(checkpoint_addr, CHECKPOINT_BLOCK);
1553 let checkpoint_list = BlockNumberList::new_pre_sorted([CHECKPOINT_BLOCK]);
1554 rocksdb.put::<tables::AccountsHistory>(checkpoint_key.clone(), &checkpoint_list).unwrap();
1555
1556 let stale_key = ShardedKey::new(stale_addr, SF_TIP);
1558 let stale_list = BlockNumberList::new_pre_sorted([CHECKPOINT_BLOCK + 1, SF_TIP]);
1559 rocksdb.put::<tables::AccountsHistory>(stale_key.clone(), &stale_list).unwrap();
1560
1561 let provider = factory.database_provider_ro().unwrap();
1563 let result = rocksdb.check_consistency(&provider).unwrap();
1564 assert_eq!(result, None, "Should heal without unwind");
1565
1566 let preserved = rocksdb.get::<tables::AccountsHistory>(checkpoint_key).unwrap();
1568 assert!(preserved.is_some(), "Entry at checkpoint block should be preserved, not deleted");
1569
1570 let stale = rocksdb.get::<tables::AccountsHistory>(stale_key).unwrap();
1572 assert!(stale.is_none(), "Stale entry after checkpoint should be removed");
1573 }
1574
1575 #[test]
1576 fn test_check_consistency_storages_history_sf_tip_equals_checkpoint_no_action() {
1577 use alloy_primitives::U256;
1578 use reth_db::models::StorageBeforeTx;
1579 use reth_static_file_types::StaticFileSegment;
1580
1581 let temp_dir = TempDir::new().unwrap();
1582 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1583
1584 let key1 = StorageShardedKey::new(Address::ZERO, B256::ZERO, 50);
1586 let key2 = StorageShardedKey::new(Address::random(), B256::random(), 80);
1587 let block_list1 = BlockNumberList::new_pre_sorted([10, 20, 30, 50]);
1588 let block_list2 = BlockNumberList::new_pre_sorted([40, 60, 80]);
1589 rocksdb.put::<tables::StoragesHistory>(key1, &block_list1).unwrap();
1590 rocksdb.put::<tables::StoragesHistory>(key2, &block_list2).unwrap();
1591
1592 let entries_before: Vec<_> =
1594 rocksdb.iter::<tables::StoragesHistory>().unwrap().map(|r| r.unwrap()).collect();
1595
1596 let factory = create_test_provider_factory();
1598 factory.set_storage_settings_cache(StorageSettings::v2());
1599
1600 {
1602 let sf_provider = factory.static_file_provider();
1603 let mut writer =
1604 sf_provider.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
1605
1606 for block_num in 0..=100u64 {
1607 let changeset = vec![StorageBeforeTx {
1608 address: Address::ZERO,
1609 key: B256::with_last_byte(block_num as u8),
1610 value: U256::from(block_num),
1611 }];
1612 writer.append_storage_changeset(changeset, block_num).unwrap();
1613 }
1614 writer.commit().unwrap();
1615 }
1616
1617 {
1619 let provider = factory.database_provider_rw().unwrap();
1620 provider
1621 .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
1622 .unwrap();
1623 provider.commit().unwrap();
1624 }
1625
1626 let provider = factory.database_provider_ro().unwrap();
1627
1628 let sf_tip = provider
1630 .static_file_provider()
1631 .get_highest_static_file_block(StaticFileSegment::StorageChangeSets)
1632 .unwrap();
1633 assert_eq!(sf_tip, 100, "Static file tip should be 100");
1634
1635 let result = rocksdb.check_consistency(&provider).unwrap();
1637 assert_eq!(result, None, "sf_tip == checkpoint should not require unwind");
1638
1639 let entries_after: Vec<_> =
1641 rocksdb.iter::<tables::StoragesHistory>().unwrap().map(|r| r.unwrap()).collect();
1642
1643 assert_eq!(
1644 entries_after.len(),
1645 entries_before.len(),
1646 "RocksDB entry count should be unchanged when sf_tip == checkpoint"
1647 );
1648
1649 for (before, after) in entries_before.iter().zip(entries_after.iter()) {
1651 assert_eq!(before.0, after.0, "Entry key should be unchanged");
1652 assert_eq!(before.1, after.1, "Entry block list should be unchanged");
1653 }
1654 }
1655}