1use super::RocksDBProvider;
8use crate::StaticFileProviderFactory;
9use alloy_eips::eip2718::Encodable2718;
10use alloy_primitives::BlockNumber;
11use rayon::prelude::*;
12use reth_db_api::tables;
13use reth_stages_types::StageId;
14use reth_static_file_types::StaticFileSegment;
15use reth_storage_api::{
16 BlockBodyIndicesProvider, ChangeSetReader, DBProvider, StageCheckpointReader,
17 StorageChangeSetReader, StorageSettingsCache, TransactionsProvider,
18};
19use reth_storage_errors::provider::ProviderResult;
20use std::collections::HashSet;
21
22const HEAL_HISTORY_BATCH_SIZE: u64 = 10_000;
25
26impl RocksDBProvider {
27 pub fn check_consistency<Provider>(
49 &self,
50 provider: &Provider,
51 ) -> ProviderResult<Option<BlockNumber>>
52 where
53 Provider: DBProvider
54 + StageCheckpointReader
55 + StorageSettingsCache
56 + StaticFileProviderFactory
57 + BlockBodyIndicesProvider
58 + StorageChangeSetReader
59 + ChangeSetReader
60 + TransactionsProvider<Transaction: Encodable2718>,
61 {
62 let mut unwind_target: Option<BlockNumber> = None;
63
64 if provider.cached_storage_settings().transaction_hash_numbers_in_rocksdb &&
66 let Some(target) = self.heal_transaction_hash_numbers(provider)?
67 {
68 unwind_target = Some(unwind_target.map_or(target, |t| t.min(target)));
69 }
70
71 if provider.cached_storage_settings().storages_history_in_rocksdb &&
73 let Some(target) = self.heal_storages_history(provider)?
74 {
75 unwind_target = Some(unwind_target.map_or(target, |t| t.min(target)));
76 }
77
78 if provider.cached_storage_settings().account_history_in_rocksdb &&
80 let Some(target) = self.heal_accounts_history(provider)?
81 {
82 unwind_target = Some(unwind_target.map_or(target, |t| t.min(target)));
83 }
84
85 Ok(unwind_target)
86 }
87
88 fn heal_transaction_hash_numbers<Provider>(
95 &self,
96 provider: &Provider,
97 ) -> ProviderResult<Option<BlockNumber>>
98 where
99 Provider: DBProvider
100 + StageCheckpointReader
101 + StaticFileProviderFactory
102 + BlockBodyIndicesProvider
103 + TransactionsProvider<Transaction: Encodable2718>,
104 {
105 let checkpoint = provider
106 .get_stage_checkpoint(StageId::TransactionLookup)?
107 .map(|cp| cp.block_number)
108 .unwrap_or(0);
109
110 let sf_tip = provider
111 .static_file_provider()
112 .get_highest_static_file_block(StaticFileSegment::Transactions)
113 .unwrap_or(0);
114
115 if checkpoint == 0 && self.first::<tables::TransactionHashNumbers>()?.is_some() {
117 tracing::info!(
118 target: "reth::providers::rocksdb",
119 "TransactionHashNumbers has data but checkpoint is 0, clearing all"
120 );
121 self.clear::<tables::TransactionHashNumbers>()?;
122 return Ok(None);
123 }
124
125 if sf_tip < checkpoint {
126 tracing::warn!(
130 target: "reth::providers::rocksdb",
131 sf_tip,
132 checkpoint,
133 "TransactionHashNumbers: static file tip behind checkpoint, unwind needed"
134 );
135 return Ok(Some(sf_tip));
136 }
137
138 if sf_tip == checkpoint {
140 return Ok(None);
141 }
142
143 let sf_tip_end_tx = provider
145 .static_file_provider()
146 .get_highest_static_file_tx(StaticFileSegment::Transactions)
147 .unwrap_or(0);
148
149 let checkpoint_next_tx = provider
151 .block_body_indices(checkpoint)?
152 .map(|indices| indices.next_tx_num())
153 .unwrap_or(0);
154
155 if sf_tip_end_tx < checkpoint_next_tx {
156 tracing::warn!(
160 target: "reth::providers::rocksdb",
161 sf_tip_end_tx,
162 checkpoint_next_tx,
163 checkpoint,
164 sf_tip,
165 "TransactionHashNumbers: static file tx tip behind checkpoint, unwind needed"
166 );
167 return Ok(Some(sf_tip));
168 }
169
170 tracing::info!(
171 target: "reth::providers::rocksdb",
172 checkpoint,
173 sf_tip,
174 checkpoint_next_tx,
175 sf_tip_end_tx,
176 "TransactionHashNumbers: healing via transaction ranges"
177 );
178
179 const BATCH_SIZE: u64 = 10_000;
180 let mut batch_start = checkpoint_next_tx;
181
182 while batch_start <= sf_tip_end_tx {
183 let batch_end = batch_start.saturating_add(BATCH_SIZE - 1).min(sf_tip_end_tx);
184
185 tracing::debug!(
186 target: "reth::providers::rocksdb",
187 batch_start,
188 batch_end,
189 "Pruning TransactionHashNumbers batch"
190 );
191
192 self.prune_transaction_hash_numbers_in_range(provider, batch_start..=batch_end)?;
193
194 batch_start = batch_end.saturating_add(1);
195 }
196
197 Ok(None)
198 }
199
200 fn prune_transaction_hash_numbers_in_range<Provider>(
213 &self,
214 provider: &Provider,
215 tx_range: std::ops::RangeInclusive<u64>,
216 ) -> ProviderResult<()>
217 where
218 Provider: TransactionsProvider<Transaction: Encodable2718>,
219 {
220 if tx_range.is_empty() {
221 return Ok(());
222 }
223
224 let hashes: Vec<_> = provider
226 .transactions_by_tx_range(tx_range.clone())?
227 .into_par_iter()
228 .map(|tx| tx.trie_hash())
229 .collect();
230
231 if !hashes.is_empty() {
232 tracing::info!(
233 target: "reth::providers::rocksdb",
234 deleted_count = hashes.len(),
235 tx_range_start = *tx_range.start(),
236 tx_range_end = *tx_range.end(),
237 "Pruning TransactionHashNumbers entries by tx range"
238 );
239
240 let mut batch = self.batch();
241 for hash in hashes {
242 batch.delete::<tables::TransactionHashNumbers>(hash)?;
243 }
244 batch.commit()?;
245 }
246
247 Ok(())
248 }
249
250 fn heal_storages_history<Provider>(
255 &self,
256 provider: &Provider,
257 ) -> ProviderResult<Option<BlockNumber>>
258 where
259 Provider:
260 DBProvider + StageCheckpointReader + StaticFileProviderFactory + StorageChangeSetReader,
261 {
262 let checkpoint = provider
263 .get_stage_checkpoint(StageId::IndexStorageHistory)?
264 .map(|cp| cp.block_number)
265 .unwrap_or(0);
266
267 if checkpoint == 0 && self.first::<tables::StoragesHistory>()?.is_some() {
269 tracing::info!(
270 target: "reth::providers::rocksdb",
271 "StoragesHistory has data but checkpoint is 0, clearing all"
272 );
273 self.clear::<tables::StoragesHistory>()?;
274 return Ok(None);
275 }
276
277 let sf_tip = provider
278 .static_file_provider()
279 .get_highest_static_file_block(StaticFileSegment::StorageChangeSets)
280 .unwrap_or(0);
281
282 if sf_tip < checkpoint {
283 tracing::warn!(
287 target: "reth::providers::rocksdb",
288 sf_tip,
289 checkpoint,
290 "StoragesHistory: static file tip behind checkpoint, unwind needed"
291 );
292 return Ok(Some(sf_tip));
293 }
294
295 if sf_tip == checkpoint {
296 return Ok(None);
297 }
298
299 let total_blocks = sf_tip - checkpoint;
300 tracing::info!(
301 target: "reth::providers::rocksdb",
302 checkpoint,
303 sf_tip,
304 total_blocks,
305 "StoragesHistory: healing via changesets"
306 );
307
308 let mut batch_start = checkpoint + 1;
309 let mut batch_num = 0u64;
310 let total_batches = total_blocks.div_ceil(HEAL_HISTORY_BATCH_SIZE);
311
312 while batch_start <= sf_tip {
313 let batch_end = (batch_start + HEAL_HISTORY_BATCH_SIZE - 1).min(sf_tip);
314 batch_num += 1;
315
316 let changesets = provider.storage_changesets_range(batch_start..=batch_end)?;
317
318 let unique_keys: HashSet<_> = changesets
319 .into_iter()
320 .map(|(block_addr, entry)| (block_addr.address(), entry.key, checkpoint + 1))
321 .collect();
322 let indices: Vec<_> = unique_keys.into_iter().collect();
323
324 if !indices.is_empty() {
325 tracing::info!(
326 target: "reth::providers::rocksdb",
327 batch_num,
328 total_batches,
329 batch_start,
330 batch_end,
331 indices_count = indices.len(),
332 "StoragesHistory: unwinding batch"
333 );
334
335 let batch = self.unwind_storage_history_indices(&indices)?;
336 self.commit_batch(batch)?;
337 }
338
339 batch_start = batch_end + 1;
340 }
341
342 Ok(None)
343 }
344
345 fn heal_accounts_history<Provider>(
350 &self,
351 provider: &Provider,
352 ) -> ProviderResult<Option<BlockNumber>>
353 where
354 Provider: DBProvider + StageCheckpointReader + StaticFileProviderFactory + ChangeSetReader,
355 {
356 let checkpoint = provider
357 .get_stage_checkpoint(StageId::IndexAccountHistory)?
358 .map(|cp| cp.block_number)
359 .unwrap_or(0);
360
361 if checkpoint == 0 && self.first::<tables::AccountsHistory>()?.is_some() {
363 tracing::info!(
364 target: "reth::providers::rocksdb",
365 "AccountsHistory has data but checkpoint is 0, clearing all"
366 );
367 self.clear::<tables::AccountsHistory>()?;
368 return Ok(None);
369 }
370
371 let sf_tip = provider
372 .static_file_provider()
373 .get_highest_static_file_block(StaticFileSegment::AccountChangeSets)
374 .unwrap_or(0);
375
376 if sf_tip < checkpoint {
377 tracing::warn!(
381 target: "reth::providers::rocksdb",
382 sf_tip,
383 checkpoint,
384 "AccountsHistory: static file tip behind checkpoint, unwind needed"
385 );
386 return Ok(Some(sf_tip));
387 }
388
389 if sf_tip == checkpoint {
390 return Ok(None);
391 }
392
393 let total_blocks = sf_tip - checkpoint;
394 tracing::info!(
395 target: "reth::providers::rocksdb",
396 checkpoint,
397 sf_tip,
398 total_blocks,
399 "AccountsHistory: healing via changesets"
400 );
401
402 let mut batch_start = checkpoint + 1;
403 let mut batch_num = 0u64;
404 let total_batches = total_blocks.div_ceil(HEAL_HISTORY_BATCH_SIZE);
405
406 while batch_start <= sf_tip {
407 let batch_end = (batch_start + HEAL_HISTORY_BATCH_SIZE - 1).min(sf_tip);
408 batch_num += 1;
409
410 let changesets = provider.account_changesets_range(batch_start..=batch_end)?;
411
412 let mut addresses = HashSet::with_capacity(changesets.len());
413 addresses.extend(changesets.iter().map(|(_, cs)| cs.address));
414 let unwind_from = checkpoint + 1;
415 let indices: Vec<_> = addresses.into_iter().map(|addr| (addr, unwind_from)).collect();
416
417 if !indices.is_empty() {
418 tracing::info!(
419 target: "reth::providers::rocksdb",
420 batch_num,
421 total_batches,
422 batch_start,
423 batch_end,
424 indices_count = indices.len(),
425 "AccountsHistory: unwinding batch"
426 );
427
428 let batch = self.unwind_account_history_indices(&indices)?;
429 self.commit_batch(batch)?;
430 }
431
432 batch_start = batch_end + 1;
433 }
434
435 Ok(None)
436 }
437}
438
439#[cfg(test)]
440mod tests {
441 use super::*;
442 use crate::{
443 providers::{rocksdb::RocksDBBuilder, static_file::StaticFileWriter},
444 test_utils::create_test_provider_factory,
445 BlockWriter, DatabaseProviderFactory, StageCheckpointWriter, TransactionsProvider,
446 };
447 use alloy_primitives::{Address, B256};
448 use reth_db::cursor::{DbCursorRO, DbCursorRW};
449 use reth_db_api::{
450 models::{storage_sharded_key::StorageShardedKey, StorageSettings},
451 tables::{self, BlockNumberList},
452 transaction::DbTxMut,
453 };
454 use reth_stages_types::StageCheckpoint;
455 use reth_testing_utils::generators::{self, BlockRangeParams};
456 use tempfile::TempDir;
457
458 #[test]
459 fn test_first_last_empty_rocksdb() {
460 let temp_dir = TempDir::new().unwrap();
461 let provider = RocksDBBuilder::new(temp_dir.path())
462 .with_table::<tables::TransactionHashNumbers>()
463 .with_table::<tables::StoragesHistory>()
464 .build()
465 .unwrap();
466
467 let first = provider.first::<tables::TransactionHashNumbers>().unwrap();
469 let last = provider.last::<tables::TransactionHashNumbers>().unwrap();
470
471 assert!(first.is_none());
472 assert!(last.is_none());
473 }
474
475 #[test]
476 fn test_first_last_with_data() {
477 let temp_dir = TempDir::new().unwrap();
478 let provider = RocksDBBuilder::new(temp_dir.path())
479 .with_table::<tables::TransactionHashNumbers>()
480 .build()
481 .unwrap();
482
483 let tx_hash = B256::from([1u8; 32]);
485 provider.put::<tables::TransactionHashNumbers>(tx_hash, &100).unwrap();
486
487 let last = provider.last::<tables::TransactionHashNumbers>().unwrap();
489 assert!(last.is_some());
490 assert_eq!(last.unwrap().1, 100);
491 }
492
493 #[test]
494 fn test_check_consistency_empty_rocksdb_no_checkpoint_is_ok() {
495 let temp_dir = TempDir::new().unwrap();
496 let rocksdb = RocksDBBuilder::new(temp_dir.path())
497 .with_table::<tables::TransactionHashNumbers>()
498 .with_table::<tables::StoragesHistory>()
499 .build()
500 .unwrap();
501
502 let factory = create_test_provider_factory();
504 factory.set_storage_settings_cache(
505 StorageSettings::legacy()
506 .with_transaction_hash_numbers_in_rocksdb(true)
507 .with_storages_history_in_rocksdb(true),
508 );
509
510 let provider = factory.database_provider_ro().unwrap();
511
512 let result = rocksdb.check_consistency(&provider).unwrap();
514 assert_eq!(result, None);
515 }
516
517 #[test]
518 fn test_check_consistency_empty_rocksdb_with_checkpoint_is_first_run() {
519 let temp_dir = TempDir::new().unwrap();
520 let rocksdb = RocksDBBuilder::new(temp_dir.path())
521 .with_table::<tables::TransactionHashNumbers>()
522 .build()
523 .unwrap();
524
525 let factory = create_test_provider_factory();
527 factory.set_storage_settings_cache(
528 StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
529 );
530
531 {
533 let provider = factory.database_provider_rw().unwrap();
534 provider
535 .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(100))
536 .unwrap();
537 provider.commit().unwrap();
538 }
539
540 let provider = factory.database_provider_ro().unwrap();
541
542 let result = rocksdb.check_consistency(&provider).unwrap();
545 assert_eq!(result, Some(0), "Static file tip (0) behind checkpoint (100) triggers unwind");
546 }
547
548 #[test]
551 fn test_check_consistency_checkpoint_zero_with_rocksdb_data_prunes_all() {
552 let temp_dir = TempDir::new().unwrap();
553 let rocksdb = RocksDBBuilder::new(temp_dir.path())
554 .with_table::<tables::TransactionHashNumbers>()
555 .build()
556 .unwrap();
557
558 let factory = create_test_provider_factory();
559 factory.set_storage_settings_cache(
560 StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
561 );
562
563 let mut rng = generators::rng();
565 let blocks = generators::random_block_range(
566 &mut rng,
567 0..=2,
568 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
569 );
570
571 let mut tx_hashes = Vec::new();
572 {
573 let provider = factory.database_provider_rw().unwrap();
574 let mut tx_count = 0u64;
575 for block in &blocks {
576 provider
577 .insert_block(&block.clone().try_recover().expect("recover block"))
578 .unwrap();
579 for tx in &block.body().transactions {
580 let hash = tx.trie_hash();
581 tx_hashes.push(hash);
582 rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
583 tx_count += 1;
584 }
585 }
586 provider.commit().unwrap();
587 }
588
589 {
591 let provider = factory.database_provider_rw().unwrap();
592 provider
593 .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(0))
594 .unwrap();
595 provider.commit().unwrap();
596 }
597
598 assert!(rocksdb.last::<tables::TransactionHashNumbers>().unwrap().is_some());
600
601 let provider = factory.database_provider_ro().unwrap();
602
603 let result = rocksdb.check_consistency(&provider).unwrap();
606 assert_eq!(result, None, "Should heal by clearing, no unwind needed");
607
608 for hash in &tx_hashes {
610 assert!(
611 rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap().is_none(),
612 "RocksDB should be empty after pruning"
613 );
614 }
615 }
616
617 #[test]
618 fn test_check_consistency_storages_history_empty_with_checkpoint_is_first_run() {
619 let temp_dir = TempDir::new().unwrap();
620 let rocksdb = RocksDBBuilder::new(temp_dir.path())
621 .with_table::<tables::StoragesHistory>()
622 .build()
623 .unwrap();
624
625 let factory = create_test_provider_factory();
627 factory.set_storage_settings_cache(
628 StorageSettings::legacy().with_storages_history_in_rocksdb(true),
629 );
630
631 {
633 let provider = factory.database_provider_rw().unwrap();
634 provider
635 .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
636 .unwrap();
637 provider.commit().unwrap();
638 }
639
640 let provider = factory.database_provider_ro().unwrap();
641
642 let result = rocksdb.check_consistency(&provider).unwrap();
646 assert_eq!(result, Some(0), "sf_tip=0 < checkpoint=100 returns unwind target");
647 }
648
649 #[test]
650 fn test_check_consistency_storages_history_has_data_no_checkpoint_prunes_data() {
651 let temp_dir = TempDir::new().unwrap();
652 let rocksdb = RocksDBBuilder::new(temp_dir.path())
653 .with_table::<tables::StoragesHistory>()
654 .build()
655 .unwrap();
656
657 let key = StorageShardedKey::new(Address::ZERO, B256::ZERO, 50);
659 let block_list = BlockNumberList::new_pre_sorted([10, 20, 30, 50]);
660 rocksdb.put::<tables::StoragesHistory>(key, &block_list).unwrap();
661
662 assert!(rocksdb.last::<tables::StoragesHistory>().unwrap().is_some());
664
665 let factory = create_test_provider_factory();
667 factory.set_storage_settings_cache(
668 StorageSettings::legacy().with_storages_history_in_rocksdb(true),
669 );
670
671 let provider = factory.database_provider_ro().unwrap();
672
673 let result = rocksdb.check_consistency(&provider).unwrap();
676 assert_eq!(result, None, "Should heal by pruning, no unwind needed");
677
678 assert!(
680 rocksdb.last::<tables::StoragesHistory>().unwrap().is_none(),
681 "RocksDB should be empty after pruning"
682 );
683 }
684 #[test]
685 fn test_check_consistency_mdbx_behind_checkpoint_needs_unwind() {
686 let temp_dir = TempDir::new().unwrap();
687 let rocksdb = RocksDBBuilder::new(temp_dir.path())
688 .with_table::<tables::TransactionHashNumbers>()
689 .build()
690 .unwrap();
691
692 let factory = create_test_provider_factory();
693 factory.set_storage_settings_cache(
694 StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
695 );
696
697 let mut rng = generators::rng();
699 let blocks = generators::random_block_range(
700 &mut rng,
701 0..=2,
702 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
703 );
704
705 {
706 let provider = factory.database_provider_rw().unwrap();
707 let mut tx_count = 0u64;
708 for block in &blocks {
709 provider
710 .insert_block(&block.clone().try_recover().expect("recover block"))
711 .unwrap();
712 for tx in &block.body().transactions {
713 let hash = tx.trie_hash();
714 rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
715 tx_count += 1;
716 }
717 }
718 provider.commit().unwrap();
719 }
720
721 {
725 let provider = factory.database_provider_rw().unwrap();
726 provider
727 .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(10))
728 .unwrap();
729 provider.commit().unwrap();
730 }
731
732 let provider = factory.database_provider_ro().unwrap();
733
734 let result = rocksdb.check_consistency(&provider).unwrap();
736 assert_eq!(result, Some(2), "sf_tip < checkpoint requires unwind to sf_tip");
737 }
738
739 #[test]
740 fn test_check_consistency_rocksdb_ahead_of_checkpoint_prunes_excess() {
741 let temp_dir = TempDir::new().unwrap();
742 let rocksdb = RocksDBBuilder::new(temp_dir.path())
743 .with_table::<tables::TransactionHashNumbers>()
744 .build()
745 .unwrap();
746
747 let factory = create_test_provider_factory();
749 factory.set_storage_settings_cache(
750 StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
751 );
752
753 let mut rng = generators::rng();
756 let blocks = generators::random_block_range(
757 &mut rng,
758 0..=5,
759 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
760 );
761
762 let mut tx_hashes = Vec::new();
764 let mut tx_count = 0u64;
765 {
766 let provider = factory.database_provider_rw().unwrap();
767 for block in &blocks {
769 provider
770 .insert_block(&block.clone().try_recover().expect("recover block"))
771 .unwrap();
772 for tx in &block.body().transactions {
773 let hash = tx.trie_hash();
774 tx_hashes.push(hash);
775 rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
776 tx_count += 1;
777 }
778 }
779 provider.commit().unwrap();
780 }
781
782 {
786 let provider = factory.database_provider_rw().unwrap();
787 let mut cursor = provider.tx_ref().cursor_write::<tables::TransactionBlocks>().unwrap();
791 let mut to_delete = Vec::new();
793 let mut walker = cursor.walk(Some(0)).unwrap();
794 while let Some((tx_num, block_num)) = walker.next().transpose().unwrap() {
795 if block_num > 2 {
796 to_delete.push(tx_num);
797 }
798 }
799 drop(walker);
800 for tx_num in to_delete {
801 cursor.seek_exact(tx_num).unwrap();
802 cursor.delete_current().unwrap();
803 }
804
805 provider
807 .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(2))
808 .unwrap();
809 provider.commit().unwrap();
810 }
811
812 let provider = factory.database_provider_ro().unwrap();
813
814 let result = rocksdb.check_consistency(&provider).unwrap();
819 assert_eq!(result, None, "Should heal by pruning, no unwind needed");
820
821 for (i, hash) in tx_hashes.iter().take(6).enumerate() {
824 assert!(
825 rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap().is_some(),
826 "tx {} should remain",
827 i
828 );
829 }
830 for (i, hash) in tx_hashes.iter().skip(6).enumerate() {
832 assert!(
833 rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap().is_none(),
834 "tx {} should be pruned",
835 i + 6
836 );
837 }
838 }
839
840 #[test]
841 fn test_check_consistency_storages_history_sentinel_only_with_checkpoint_is_first_run() {
842 let temp_dir = TempDir::new().unwrap();
843 let rocksdb = RocksDBBuilder::new(temp_dir.path())
844 .with_table::<tables::StoragesHistory>()
845 .build()
846 .unwrap();
847
848 let key_sentinel_1 = StorageShardedKey::new(Address::ZERO, B256::ZERO, u64::MAX);
851 let key_sentinel_2 = StorageShardedKey::new(Address::random(), B256::random(), u64::MAX);
852 let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
853 rocksdb.put::<tables::StoragesHistory>(key_sentinel_1, &block_list).unwrap();
854 rocksdb.put::<tables::StoragesHistory>(key_sentinel_2, &block_list).unwrap();
855
856 assert!(rocksdb.first::<tables::StoragesHistory>().unwrap().is_some());
858
859 let factory = create_test_provider_factory();
861 factory.set_storage_settings_cache(
862 StorageSettings::legacy().with_storages_history_in_rocksdb(true),
863 );
864
865 {
867 let provider = factory.database_provider_rw().unwrap();
868 provider
869 .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
870 .unwrap();
871 provider.commit().unwrap();
872 }
873
874 let provider = factory.database_provider_ro().unwrap();
875
876 let result = rocksdb.check_consistency(&provider).unwrap();
880 assert_eq!(result, Some(0), "sf_tip=0 < checkpoint=100 returns unwind target");
881 }
882
883 #[test]
884 fn test_check_consistency_accounts_history_sentinel_only_with_checkpoint_is_first_run() {
885 use reth_db_api::models::ShardedKey;
886
887 let temp_dir = TempDir::new().unwrap();
888 let rocksdb = RocksDBBuilder::new(temp_dir.path())
889 .with_table::<tables::AccountsHistory>()
890 .build()
891 .unwrap();
892
893 let key_sentinel_1 = ShardedKey::new(Address::ZERO, u64::MAX);
895 let key_sentinel_2 = ShardedKey::new(Address::random(), u64::MAX);
896 let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
897 rocksdb.put::<tables::AccountsHistory>(key_sentinel_1, &block_list).unwrap();
898 rocksdb.put::<tables::AccountsHistory>(key_sentinel_2, &block_list).unwrap();
899
900 assert!(rocksdb.first::<tables::AccountsHistory>().unwrap().is_some());
902
903 let factory = create_test_provider_factory();
905 factory.set_storage_settings_cache(
906 StorageSettings::legacy().with_account_history_in_rocksdb(true),
907 );
908
909 {
911 let provider = factory.database_provider_rw().unwrap();
912 provider
913 .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
914 .unwrap();
915 provider.commit().unwrap();
916 }
917
918 let provider = factory.database_provider_ro().unwrap();
919
920 let result = rocksdb.check_consistency(&provider).unwrap();
924 assert_eq!(result, Some(0), "sf_tip=0 < checkpoint=100 returns unwind target");
925 }
926
927 #[test]
931 fn test_prune_transaction_hash_numbers_by_range() {
932 let temp_dir = TempDir::new().unwrap();
933 let rocksdb = RocksDBBuilder::new(temp_dir.path())
934 .with_table::<tables::TransactionHashNumbers>()
935 .build()
936 .unwrap();
937
938 let factory = create_test_provider_factory();
940 factory.set_storage_settings_cache(
941 StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
942 );
943
944 let mut rng = generators::rng();
948 let blocks = generators::random_block_range(
949 &mut rng,
950 0..=5,
951 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
952 );
953
954 let mut tx_count = 0u64;
956 let mut tx_hashes = Vec::new();
957 {
958 let provider = factory.database_provider_rw().unwrap();
959
960 for block in &blocks {
961 provider
962 .insert_block(&block.clone().try_recover().expect("recover block"))
963 .unwrap();
964
965 for tx in &block.body().transactions {
967 let hash = tx.trie_hash();
968 tx_hashes.push(hash);
969 rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
970 tx_count += 1;
971 }
972 }
973
974 provider
977 .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(2))
978 .unwrap();
979 provider.commit().unwrap();
980 }
981
982 let rocksdb_count_before: usize =
989 rocksdb.iter::<tables::TransactionHashNumbers>().unwrap().count();
990 assert_eq!(
991 rocksdb_count_before, tx_count as usize,
992 "RocksDB should have all {} transaction hashes before pruning",
993 tx_count
994 );
995
996 let provider = factory.database_provider_ro().unwrap();
997
998 let all_txs = provider.transactions_by_tx_range(0..tx_count).unwrap();
1000 assert_eq!(all_txs.len(), tx_count as usize, "Should be able to fetch all transactions");
1001
1002 for (i, tx) in all_txs.iter().enumerate() {
1004 let computed_hash = tx.trie_hash();
1005 assert_eq!(
1006 computed_hash, tx_hashes[i],
1007 "Hash mismatch for tx {}: stored {:?} vs computed {:?}",
1008 i, tx_hashes[i], computed_hash
1009 );
1010 }
1011
1012 let max_tx_to_keep = 5u64;
1015 let tx_to_prune_start = max_tx_to_keep + 1;
1016
1017 rocksdb
1019 .prune_transaction_hash_numbers_in_range(&provider, tx_to_prune_start..=(tx_count - 1))
1020 .expect("prune should succeed");
1021
1022 let mut remaining_count = 0;
1024 for result in rocksdb.iter::<tables::TransactionHashNumbers>().unwrap() {
1025 let (_hash, tx_num) = result.unwrap();
1026 assert!(
1027 tx_num <= max_tx_to_keep,
1028 "Transaction {} should have been pruned (> {})",
1029 tx_num,
1030 max_tx_to_keep
1031 );
1032 remaining_count += 1;
1033 }
1034 assert_eq!(
1035 remaining_count,
1036 (max_tx_to_keep + 1) as usize,
1037 "Should have {} transactions (0-{})",
1038 max_tx_to_keep + 1,
1039 max_tx_to_keep
1040 );
1041 }
1042
1043 #[test]
1044 fn test_check_consistency_accounts_history_empty_with_checkpoint_is_first_run() {
1045 let temp_dir = TempDir::new().unwrap();
1046 let rocksdb = RocksDBBuilder::new(temp_dir.path())
1047 .with_table::<tables::AccountsHistory>()
1048 .build()
1049 .unwrap();
1050
1051 let factory = create_test_provider_factory();
1053 factory.set_storage_settings_cache(
1054 StorageSettings::legacy().with_account_history_in_rocksdb(true),
1055 );
1056
1057 {
1059 let provider = factory.database_provider_rw().unwrap();
1060 provider
1061 .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
1062 .unwrap();
1063 provider.commit().unwrap();
1064 }
1065
1066 let provider = factory.database_provider_ro().unwrap();
1067
1068 let result = rocksdb.check_consistency(&provider).unwrap();
1072 assert_eq!(result, Some(0), "sf_tip=0 < checkpoint=100 returns unwind target");
1073 }
1074
1075 #[test]
1076 fn test_check_consistency_accounts_history_has_data_no_checkpoint_prunes_data() {
1077 use reth_db_api::models::ShardedKey;
1078
1079 let temp_dir = TempDir::new().unwrap();
1080 let rocksdb = RocksDBBuilder::new(temp_dir.path())
1081 .with_table::<tables::AccountsHistory>()
1082 .build()
1083 .unwrap();
1084
1085 let key = ShardedKey::new(Address::ZERO, 50);
1087 let block_list = BlockNumberList::new_pre_sorted([10, 20, 30, 50]);
1088 rocksdb.put::<tables::AccountsHistory>(key, &block_list).unwrap();
1089
1090 assert!(rocksdb.last::<tables::AccountsHistory>().unwrap().is_some());
1092
1093 let factory = create_test_provider_factory();
1095 factory.set_storage_settings_cache(
1096 StorageSettings::legacy().with_account_history_in_rocksdb(true),
1097 );
1098
1099 let provider = factory.database_provider_ro().unwrap();
1100
1101 let result = rocksdb.check_consistency(&provider).unwrap();
1104 assert_eq!(result, None, "Should heal by pruning, no unwind needed");
1105
1106 assert!(
1108 rocksdb.last::<tables::AccountsHistory>().unwrap().is_none(),
1109 "RocksDB should be empty after pruning"
1110 );
1111 }
1112
1113 #[test]
1114 fn test_check_consistency_accounts_history_sf_tip_equals_checkpoint_no_action() {
1115 use reth_db::models::AccountBeforeTx;
1116 use reth_db_api::models::ShardedKey;
1117 use reth_static_file_types::StaticFileSegment;
1118
1119 let temp_dir = TempDir::new().unwrap();
1120 let rocksdb = RocksDBBuilder::new(temp_dir.path())
1121 .with_table::<tables::AccountsHistory>()
1122 .build()
1123 .unwrap();
1124
1125 let key1 = ShardedKey::new(Address::ZERO, 50);
1127 let key2 = ShardedKey::new(Address::random(), 75);
1128 let key3 = ShardedKey::new(Address::random(), u64::MAX); let block_list1 = BlockNumberList::new_pre_sorted([10, 20, 30, 50]);
1130 let block_list2 = BlockNumberList::new_pre_sorted([40, 60, 75]);
1131 let block_list3 = BlockNumberList::new_pre_sorted([80, 90, 100]);
1132 rocksdb.put::<tables::AccountsHistory>(key1, &block_list1).unwrap();
1133 rocksdb.put::<tables::AccountsHistory>(key2, &block_list2).unwrap();
1134 rocksdb.put::<tables::AccountsHistory>(key3, &block_list3).unwrap();
1135
1136 let entries_before: Vec<_> =
1138 rocksdb.iter::<tables::AccountsHistory>().unwrap().map(|r| r.unwrap()).collect();
1139 assert_eq!(entries_before.len(), 3, "Should have 3 entries before check");
1140
1141 let factory = create_test_provider_factory();
1143 factory.set_storage_settings_cache(
1144 StorageSettings::legacy().with_account_history_in_rocksdb(true),
1145 );
1146
1147 {
1149 let sf_provider = factory.static_file_provider();
1150 let mut writer =
1151 sf_provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
1152
1153 for block_num in 0..=100 {
1154 let changeset = vec![AccountBeforeTx { address: Address::random(), info: None }];
1155 writer.append_account_changeset(changeset, block_num).unwrap();
1156 }
1157
1158 writer.commit().unwrap();
1159 }
1160
1161 {
1163 let provider = factory.database_provider_rw().unwrap();
1164 provider
1165 .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
1166 .unwrap();
1167 provider.commit().unwrap();
1168 }
1169
1170 let provider = factory.database_provider_ro().unwrap();
1171
1172 let sf_tip = provider
1174 .static_file_provider()
1175 .get_highest_static_file_block(StaticFileSegment::AccountChangeSets)
1176 .unwrap();
1177 assert_eq!(sf_tip, 100, "Static file tip should be 100");
1178
1179 let result = rocksdb.check_consistency(&provider).unwrap();
1181 assert_eq!(result, None, "sf_tip == checkpoint should not require unwind");
1182
1183 let entries_after: Vec<_> =
1185 rocksdb.iter::<tables::AccountsHistory>().unwrap().map(|r| r.unwrap()).collect();
1186
1187 assert_eq!(
1188 entries_after.len(),
1189 entries_before.len(),
1190 "RocksDB entry count should be unchanged when sf_tip == checkpoint"
1191 );
1192
1193 for (before, after) in entries_before.iter().zip(entries_after.iter()) {
1195 assert_eq!(before.0.key, after.0.key, "Entry key should be unchanged");
1196 assert_eq!(
1197 before.0.highest_block_number, after.0.highest_block_number,
1198 "Entry highest_block_number should be unchanged"
1199 );
1200 assert_eq!(before.1, after.1, "Entry block list should be unchanged");
1201 }
1202 }
1203
1204 #[test]
1216 fn test_check_consistency_storages_history_heals_via_changesets_large_range() {
1217 use alloy_primitives::U256;
1218 use reth_db_api::models::StorageBeforeTx;
1219
1220 const TOTAL_BLOCKS: u64 = 15_000;
1221 const CHECKPOINT_BLOCK: u64 = 5_000;
1222
1223 let temp_dir = TempDir::new().unwrap();
1224 let rocksdb = RocksDBBuilder::new(temp_dir.path())
1225 .with_table::<tables::StoragesHistory>()
1226 .build()
1227 .unwrap();
1228
1229 let factory = create_test_provider_factory();
1230 factory.set_storage_settings_cache(
1231 StorageSettings::legacy()
1232 .with_storages_history_in_rocksdb(true)
1233 .with_storage_changesets_in_static_files(true),
1234 );
1235
1236 #[inline]
1238 fn make_address(block_num: u64) -> Address {
1239 let mut addr_bytes = [0u8; 20];
1240 addr_bytes[0..8].copy_from_slice(&block_num.to_le_bytes());
1241 Address::from(addr_bytes)
1242 }
1243
1244 #[inline]
1246 fn make_slot(block_num: u64) -> B256 {
1247 let mut slot_bytes = [0u8; 32];
1248 slot_bytes[0..8].copy_from_slice(&block_num.to_le_bytes());
1249 B256::from(slot_bytes)
1250 }
1251
1252 {
1255 let sf_provider = factory.static_file_provider();
1256 let mut writer =
1257 sf_provider.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
1258
1259 let mut changeset = Vec::with_capacity(1);
1261
1262 for block_num in 0..TOTAL_BLOCKS {
1263 changeset.clear();
1264 changeset.push(StorageBeforeTx {
1265 address: make_address(block_num),
1266 key: make_slot(block_num),
1267 value: U256::from(block_num),
1268 });
1269
1270 writer.append_storage_changeset(changeset.clone(), block_num).unwrap();
1271 }
1272
1273 writer.commit().unwrap();
1274 }
1275
1276 {
1278 let sf_provider = factory.static_file_provider();
1279 let highest = sf_provider
1280 .get_highest_static_file_block(StaticFileSegment::StorageChangeSets)
1281 .unwrap();
1282 assert_eq!(highest, TOTAL_BLOCKS - 1, "Static files should have blocks 0..14999");
1283 }
1284
1285 {
1287 let provider = factory.database_provider_rw().unwrap();
1288 provider
1289 .save_stage_checkpoint(
1290 StageId::IndexStorageHistory,
1291 StageCheckpoint::new(CHECKPOINT_BLOCK),
1292 )
1293 .unwrap();
1294 provider.commit().unwrap();
1295 }
1296
1297 for block_num in (CHECKPOINT_BLOCK + 1)..TOTAL_BLOCKS {
1300 let key =
1301 StorageShardedKey::new(make_address(block_num), make_slot(block_num), block_num);
1302 let block_list = BlockNumberList::new_pre_sorted([block_num]);
1303 rocksdb.put::<tables::StoragesHistory>(key, &block_list).unwrap();
1304 }
1305
1306 let count_before: usize = rocksdb.iter::<tables::StoragesHistory>().unwrap().count();
1308 assert_eq!(
1309 count_before,
1310 (TOTAL_BLOCKS - CHECKPOINT_BLOCK - 1) as usize,
1311 "Should have {} stale entries before healing",
1312 TOTAL_BLOCKS - CHECKPOINT_BLOCK - 1
1313 );
1314
1315 let provider = factory.database_provider_ro().unwrap();
1317 let result = rocksdb.check_consistency(&provider).unwrap();
1318 assert_eq!(result, None, "Should heal via changesets, no unwind needed");
1319
1320 let mut remaining_stale = 0;
1323 for result in rocksdb.iter::<tables::StoragesHistory>().unwrap() {
1324 let (key, _) = result.unwrap();
1325 if key.sharded_key.highest_block_number > CHECKPOINT_BLOCK {
1326 remaining_stale += 1;
1327 }
1328 }
1329 assert_eq!(
1330 remaining_stale, 0,
1331 "All stale entries (block > {}) should be pruned",
1332 CHECKPOINT_BLOCK
1333 );
1334 }
1335
1336 #[test]
1340 fn test_check_consistency_storages_history_preserves_checkpoint_block() {
1341 use alloy_primitives::U256;
1342 use reth_db_api::models::StorageBeforeTx;
1343
1344 const CHECKPOINT_BLOCK: u64 = 100;
1345 const SF_TIP: u64 = 200;
1346
1347 let temp_dir = TempDir::new().unwrap();
1348 let rocksdb = RocksDBBuilder::new(temp_dir.path())
1349 .with_table::<tables::StoragesHistory>()
1350 .build()
1351 .unwrap();
1352
1353 let factory = create_test_provider_factory();
1354 factory.set_storage_settings_cache(
1355 StorageSettings::legacy()
1356 .with_storages_history_in_rocksdb(true)
1357 .with_storage_changesets_in_static_files(true),
1358 );
1359
1360 let checkpoint_addr = Address::repeat_byte(0xAA);
1361 let checkpoint_slot = B256::repeat_byte(0xBB);
1362 let stale_addr = Address::repeat_byte(0xCC);
1363 let stale_slot = B256::repeat_byte(0xDD);
1364
1365 {
1367 let sf_provider = factory.static_file_provider();
1368 let mut writer =
1369 sf_provider.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
1370
1371 for block_num in 0..=SF_TIP {
1372 let changeset = if block_num == CHECKPOINT_BLOCK {
1373 vec![StorageBeforeTx {
1374 address: checkpoint_addr,
1375 key: checkpoint_slot,
1376 value: U256::from(block_num),
1377 }]
1378 } else if block_num > CHECKPOINT_BLOCK {
1379 vec![StorageBeforeTx {
1380 address: stale_addr,
1381 key: stale_slot,
1382 value: U256::from(block_num),
1383 }]
1384 } else {
1385 vec![StorageBeforeTx {
1386 address: Address::ZERO,
1387 key: B256::ZERO,
1388 value: U256::ZERO,
1389 }]
1390 };
1391 writer.append_storage_changeset(changeset, block_num).unwrap();
1392 }
1393 writer.commit().unwrap();
1394 }
1395
1396 {
1398 let provider = factory.database_provider_rw().unwrap();
1399 provider
1400 .save_stage_checkpoint(
1401 StageId::IndexStorageHistory,
1402 StageCheckpoint::new(CHECKPOINT_BLOCK),
1403 )
1404 .unwrap();
1405 provider.commit().unwrap();
1406 }
1407
1408 let checkpoint_key =
1410 StorageShardedKey::new(checkpoint_addr, checkpoint_slot, CHECKPOINT_BLOCK);
1411 let checkpoint_list = BlockNumberList::new_pre_sorted([CHECKPOINT_BLOCK]);
1412 rocksdb.put::<tables::StoragesHistory>(checkpoint_key.clone(), &checkpoint_list).unwrap();
1413
1414 let stale_key = StorageShardedKey::new(stale_addr, stale_slot, SF_TIP);
1416 let stale_list = BlockNumberList::new_pre_sorted([CHECKPOINT_BLOCK + 1, SF_TIP]);
1417 rocksdb.put::<tables::StoragesHistory>(stale_key.clone(), &stale_list).unwrap();
1418
1419 let provider = factory.database_provider_ro().unwrap();
1421 let result = rocksdb.check_consistency(&provider).unwrap();
1422 assert_eq!(result, None, "Should heal without unwind");
1423
1424 let preserved = rocksdb.get::<tables::StoragesHistory>(checkpoint_key).unwrap();
1426 assert!(preserved.is_some(), "Entry at checkpoint block should be preserved, not deleted");
1427
1428 let stale = rocksdb.get::<tables::StoragesHistory>(stale_key).unwrap();
1430 assert!(stale.is_none(), "Stale entry after checkpoint should be removed");
1431 }
1432
1433 #[test]
1447 fn test_check_consistency_accounts_history_heals_via_changesets_large_range() {
1448 use reth_db::models::AccountBeforeTx;
1449 use reth_db_api::models::ShardedKey;
1450 use reth_static_file_types::StaticFileSegment;
1451
1452 let temp_dir = TempDir::new().unwrap();
1453 let rocksdb = RocksDBBuilder::new(temp_dir.path())
1454 .with_table::<tables::AccountsHistory>()
1455 .build()
1456 .unwrap();
1457
1458 let factory = create_test_provider_factory();
1460 factory.set_storage_settings_cache(
1461 StorageSettings::legacy()
1462 .with_account_history_in_rocksdb(true)
1463 .with_account_changesets_in_static_files(true),
1464 );
1465
1466 const TOTAL_BLOCKS: u64 = 15_000;
1467 const CHECKPOINT_BLOCK: u64 = 5_000;
1468
1469 #[inline]
1471 fn make_address(block_num: u64) -> Address {
1472 let mut addr = Address::ZERO;
1473 addr.0[0..8].copy_from_slice(&block_num.to_le_bytes());
1474 addr
1475 }
1476
1477 {
1479 let sf_provider = factory.static_file_provider();
1480 let mut writer =
1481 sf_provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
1482
1483 let mut changeset = Vec::with_capacity(1);
1485
1486 for block_num in 0..TOTAL_BLOCKS {
1487 changeset.clear();
1488 changeset.push(AccountBeforeTx { address: make_address(block_num), info: None });
1489 writer.append_account_changeset(changeset.clone(), block_num).unwrap();
1490 }
1491
1492 writer.commit().unwrap();
1493 }
1494
1495 for block_num in (CHECKPOINT_BLOCK + 1)..TOTAL_BLOCKS {
1499 let key = ShardedKey::new(make_address(block_num), block_num);
1500 let block_list = BlockNumberList::new_pre_sorted([block_num]);
1501 rocksdb.put::<tables::AccountsHistory>(key, &block_list).unwrap();
1502 }
1503
1504 for block_num in [100u64, 500, 1000, 2500, 5000] {
1506 let key = ShardedKey::new(make_address(block_num), block_num);
1507 let block_list = BlockNumberList::new_pre_sorted([block_num]);
1508 rocksdb.put::<tables::AccountsHistory>(key, &block_list).unwrap();
1509 }
1510
1511 let entries_before: usize = rocksdb.iter::<tables::AccountsHistory>().unwrap().count();
1513 let stale_count = (TOTAL_BLOCKS - CHECKPOINT_BLOCK - 1) as usize;
1514 let valid_count = 5usize;
1515 assert_eq!(
1516 entries_before,
1517 stale_count + valid_count,
1518 "Should have {} stale + {} valid entries before healing",
1519 stale_count,
1520 valid_count
1521 );
1522
1523 {
1525 let provider = factory.database_provider_rw().unwrap();
1526 provider
1527 .save_stage_checkpoint(
1528 StageId::IndexAccountHistory,
1529 StageCheckpoint::new(CHECKPOINT_BLOCK),
1530 )
1531 .unwrap();
1532 provider.commit().unwrap();
1533 }
1534
1535 let provider = factory.database_provider_ro().unwrap();
1536
1537 let sf_tip = provider
1539 .static_file_provider()
1540 .get_highest_static_file_block(StaticFileSegment::AccountChangeSets)
1541 .unwrap();
1542 assert_eq!(sf_tip, TOTAL_BLOCKS - 1, "Static file tip should be 14999");
1543 assert!(sf_tip > CHECKPOINT_BLOCK, "sf_tip should be > checkpoint to trigger healing");
1544
1545 let result = rocksdb.check_consistency(&provider).unwrap();
1547 assert_eq!(result, None, "Healing should succeed without requiring unwind");
1548
1549 let mut remaining_stale = 0;
1552 for result in rocksdb.iter::<tables::AccountsHistory>().unwrap() {
1553 let (key, _) = result.unwrap();
1554 if key.highest_block_number > CHECKPOINT_BLOCK && key.highest_block_number != u64::MAX {
1555 remaining_stale += 1;
1556 }
1557 }
1558 assert_eq!(
1559 remaining_stale, 0,
1560 "All stale entries (block > {}) should be pruned",
1561 CHECKPOINT_BLOCK
1562 );
1563 }
1564
1565 #[test]
1567 fn test_check_consistency_accounts_history_preserves_checkpoint_block() {
1568 use reth_db::models::AccountBeforeTx;
1569 use reth_db_api::models::ShardedKey;
1570
1571 const CHECKPOINT_BLOCK: u64 = 100;
1572 const SF_TIP: u64 = 200;
1573
1574 let temp_dir = TempDir::new().unwrap();
1575 let rocksdb = RocksDBBuilder::new(temp_dir.path())
1576 .with_table::<tables::AccountsHistory>()
1577 .build()
1578 .unwrap();
1579
1580 let factory = create_test_provider_factory();
1581 factory.set_storage_settings_cache(
1582 StorageSettings::legacy()
1583 .with_account_history_in_rocksdb(true)
1584 .with_account_changesets_in_static_files(true),
1585 );
1586
1587 let checkpoint_addr = Address::repeat_byte(0xAA);
1588 let stale_addr = Address::repeat_byte(0xCC);
1589
1590 {
1592 let sf_provider = factory.static_file_provider();
1593 let mut writer =
1594 sf_provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
1595
1596 for block_num in 0..=SF_TIP {
1597 let changeset = if block_num == CHECKPOINT_BLOCK {
1598 vec![AccountBeforeTx { address: checkpoint_addr, info: None }]
1599 } else if block_num > CHECKPOINT_BLOCK {
1600 vec![AccountBeforeTx { address: stale_addr, info: None }]
1601 } else {
1602 vec![AccountBeforeTx { address: Address::ZERO, info: None }]
1603 };
1604 writer.append_account_changeset(changeset, block_num).unwrap();
1605 }
1606 writer.commit().unwrap();
1607 }
1608
1609 {
1611 let provider = factory.database_provider_rw().unwrap();
1612 provider
1613 .save_stage_checkpoint(
1614 StageId::IndexAccountHistory,
1615 StageCheckpoint::new(CHECKPOINT_BLOCK),
1616 )
1617 .unwrap();
1618 provider.commit().unwrap();
1619 }
1620
1621 let checkpoint_key = ShardedKey::new(checkpoint_addr, CHECKPOINT_BLOCK);
1623 let checkpoint_list = BlockNumberList::new_pre_sorted([CHECKPOINT_BLOCK]);
1624 rocksdb.put::<tables::AccountsHistory>(checkpoint_key.clone(), &checkpoint_list).unwrap();
1625
1626 let stale_key = ShardedKey::new(stale_addr, SF_TIP);
1628 let stale_list = BlockNumberList::new_pre_sorted([CHECKPOINT_BLOCK + 1, SF_TIP]);
1629 rocksdb.put::<tables::AccountsHistory>(stale_key.clone(), &stale_list).unwrap();
1630
1631 let provider = factory.database_provider_ro().unwrap();
1633 let result = rocksdb.check_consistency(&provider).unwrap();
1634 assert_eq!(result, None, "Should heal without unwind");
1635
1636 let preserved = rocksdb.get::<tables::AccountsHistory>(checkpoint_key).unwrap();
1638 assert!(preserved.is_some(), "Entry at checkpoint block should be preserved, not deleted");
1639
1640 let stale = rocksdb.get::<tables::AccountsHistory>(stale_key).unwrap();
1642 assert!(stale.is_none(), "Stale entry after checkpoint should be removed");
1643 }
1644
1645 #[test]
1646 fn test_check_consistency_storages_history_sf_tip_equals_checkpoint_no_action() {
1647 use alloy_primitives::U256;
1648 use reth_db::models::StorageBeforeTx;
1649 use reth_static_file_types::StaticFileSegment;
1650
1651 let temp_dir = TempDir::new().unwrap();
1652 let rocksdb = RocksDBBuilder::new(temp_dir.path())
1653 .with_table::<tables::StoragesHistory>()
1654 .build()
1655 .unwrap();
1656
1657 let key1 = StorageShardedKey::new(Address::ZERO, B256::ZERO, 50);
1659 let key2 = StorageShardedKey::new(Address::random(), B256::random(), 80);
1660 let block_list1 = BlockNumberList::new_pre_sorted([10, 20, 30, 50]);
1661 let block_list2 = BlockNumberList::new_pre_sorted([40, 60, 80]);
1662 rocksdb.put::<tables::StoragesHistory>(key1, &block_list1).unwrap();
1663 rocksdb.put::<tables::StoragesHistory>(key2, &block_list2).unwrap();
1664
1665 let entries_before: Vec<_> =
1667 rocksdb.iter::<tables::StoragesHistory>().unwrap().map(|r| r.unwrap()).collect();
1668
1669 let factory = create_test_provider_factory();
1671 factory.set_storage_settings_cache(
1672 StorageSettings::legacy().with_storages_history_in_rocksdb(true),
1673 );
1674
1675 {
1677 let sf_provider = factory.static_file_provider();
1678 let mut writer =
1679 sf_provider.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
1680
1681 for block_num in 0..=100u64 {
1682 let changeset = vec![StorageBeforeTx {
1683 address: Address::ZERO,
1684 key: B256::with_last_byte(block_num as u8),
1685 value: U256::from(block_num),
1686 }];
1687 writer.append_storage_changeset(changeset, block_num).unwrap();
1688 }
1689 writer.commit().unwrap();
1690 }
1691
1692 {
1694 let provider = factory.database_provider_rw().unwrap();
1695 provider
1696 .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
1697 .unwrap();
1698 provider.commit().unwrap();
1699 }
1700
1701 let provider = factory.database_provider_ro().unwrap();
1702
1703 let sf_tip = provider
1705 .static_file_provider()
1706 .get_highest_static_file_block(StaticFileSegment::StorageChangeSets)
1707 .unwrap();
1708 assert_eq!(sf_tip, 100, "Static file tip should be 100");
1709
1710 let result = rocksdb.check_consistency(&provider).unwrap();
1712 assert_eq!(result, None, "sf_tip == checkpoint should not require unwind");
1713
1714 let entries_after: Vec<_> =
1716 rocksdb.iter::<tables::StoragesHistory>().unwrap().map(|r| r.unwrap()).collect();
1717
1718 assert_eq!(
1719 entries_after.len(),
1720 entries_before.len(),
1721 "RocksDB entry count should be unchanged when sf_tip == checkpoint"
1722 );
1723
1724 for (before, after) in entries_before.iter().zip(entries_after.iter()) {
1726 assert_eq!(before.0, after.0, "Entry key should be unchanged");
1727 assert_eq!(before.1, after.1, "Entry block list should be unchanged");
1728 }
1729 }
1730}