1use super::RocksDBProvider;
8use crate::StaticFileProviderFactory;
9use alloy_eips::eip2718::Encodable2718;
10use alloy_primitives::BlockNumber;
11use rayon::prelude::*;
12use reth_db::cursor::DbCursorRO;
13use reth_db_api::{tables, transaction::DbTx};
14use reth_stages_types::StageId;
15use reth_static_file_types::StaticFileSegment;
16use reth_storage_api::{
17 DBProvider, StageCheckpointReader, StorageSettingsCache, TransactionsProvider,
18};
19use reth_storage_errors::provider::ProviderResult;
20
21impl RocksDBProvider {
22 pub fn check_consistency<Provider>(
46 &self,
47 provider: &Provider,
48 ) -> ProviderResult<Option<BlockNumber>>
49 where
50 Provider: DBProvider
51 + StageCheckpointReader
52 + StorageSettingsCache
53 + StaticFileProviderFactory
54 + TransactionsProvider<Transaction: Encodable2718>,
55 {
56 let mut unwind_target: Option<BlockNumber> = None;
57
58 if provider.cached_storage_settings().transaction_hash_numbers_in_rocksdb &&
60 let Some(target) = self.check_transaction_hash_numbers(provider)?
61 {
62 unwind_target = Some(unwind_target.map_or(target, |t| t.min(target)));
63 }
64
65 if provider.cached_storage_settings().storages_history_in_rocksdb &&
67 let Some(target) = self.check_storages_history(provider)?
68 {
69 unwind_target = Some(unwind_target.map_or(target, |t| t.min(target)));
70 }
71
72 if provider.cached_storage_settings().account_history_in_rocksdb &&
74 let Some(target) = self.check_accounts_history(provider)?
75 {
76 unwind_target = Some(unwind_target.map_or(target, |t| t.min(target)));
77 }
78
79 Ok(unwind_target)
80 }
81
82 fn check_transaction_hash_numbers<Provider>(
95 &self,
96 provider: &Provider,
97 ) -> ProviderResult<Option<BlockNumber>>
98 where
99 Provider: DBProvider
100 + StageCheckpointReader
101 + StaticFileProviderFactory
102 + TransactionsProvider<Transaction: Encodable2718>,
103 {
104 let checkpoint = provider
106 .get_stage_checkpoint(StageId::TransactionLookup)?
107 .map(|cp| cp.block_number)
108 .unwrap_or(0);
109
110 let mut cursor = provider.tx_ref().cursor_read::<tables::TransactionBlocks>()?;
112 let mdbx_last = cursor.last()?;
113
114 let highest_static_tx = provider
116 .static_file_provider()
117 .get_highest_static_file_tx(StaticFileSegment::Transactions);
118
119 match (mdbx_last, highest_static_tx) {
120 (Some((mdbx_tx, mdbx_block)), Some(highest_tx)) if highest_tx > mdbx_tx => {
121 tracing::info!(
124 target: "reth::providers::rocksdb",
125 mdbx_last_tx = mdbx_tx,
126 mdbx_block,
127 highest_static_tx = highest_tx,
128 "Static files ahead of MDBX, pruning TransactionHashNumbers excess data"
129 );
130 self.prune_transaction_hash_numbers_in_range(provider, (mdbx_tx + 1)..=highest_tx)?;
131
132 if checkpoint > mdbx_block {
134 tracing::warn!(
135 target: "reth::providers::rocksdb",
136 mdbx_block,
137 checkpoint,
138 "MDBX behind checkpoint after pruning, unwind needed"
139 );
140 return Ok(Some(mdbx_block));
141 }
142 }
143 (Some((_mdbx_tx, mdbx_block)), _) => {
144 if checkpoint > mdbx_block {
147 tracing::warn!(
148 target: "reth::providers::rocksdb",
149 mdbx_block,
150 checkpoint,
151 "MDBX behind checkpoint, unwind needed"
152 );
153 return Ok(Some(mdbx_block));
154 }
155 }
156 (None, Some(highest_tx)) => {
157 tracing::info!(
160 target: "reth::providers::rocksdb",
161 highest_static_tx = highest_tx,
162 "MDBX empty but static files have data, pruning all TransactionHashNumbers"
163 );
164 self.prune_transaction_hash_numbers_in_range(provider, 0..=highest_tx)?;
165 }
166 (None, None) => {
167 if checkpoint > 0 {
170 tracing::warn!(
171 target: "reth::providers::rocksdb",
172 checkpoint,
173 "Checkpoint set but no transaction data exists, unwind needed"
174 );
175 return Ok(Some(0));
176 }
177 }
178 }
179
180 Ok(None)
181 }
182
183 fn prune_transaction_hash_numbers_in_range<Provider>(
196 &self,
197 provider: &Provider,
198 tx_range: std::ops::RangeInclusive<u64>,
199 ) -> ProviderResult<()>
200 where
201 Provider: TransactionsProvider<Transaction: Encodable2718>,
202 {
203 if tx_range.is_empty() {
204 return Ok(());
205 }
206
207 let hashes: Vec<_> = provider
209 .transactions_by_tx_range(tx_range.clone())?
210 .into_par_iter()
211 .map(|tx| tx.trie_hash())
212 .collect();
213
214 if !hashes.is_empty() {
215 tracing::info!(
216 target: "reth::providers::rocksdb",
217 deleted_count = hashes.len(),
218 tx_range_start = *tx_range.start(),
219 tx_range_end = *tx_range.end(),
220 "Pruning TransactionHashNumbers entries by tx range"
221 );
222
223 let mut batch = self.batch();
224 for hash in hashes {
225 batch.delete::<tables::TransactionHashNumbers>(hash)?;
226 }
227 batch.commit()?;
228 }
229
230 Ok(())
231 }
232
233 fn check_storages_history<Provider>(
238 &self,
239 provider: &Provider,
240 ) -> ProviderResult<Option<BlockNumber>>
241 where
242 Provider: DBProvider + StageCheckpointReader,
243 {
244 let checkpoint = provider
246 .get_stage_checkpoint(StageId::IndexStorageHistory)?
247 .map(|cp| cp.block_number)
248 .unwrap_or(0);
249
250 let rocks_first = self.first::<tables::StoragesHistory>()?;
252
253 match rocks_first {
254 Some(_) => {
255 if checkpoint == 0 {
257 tracing::info!(
258 target: "reth::providers::rocksdb",
259 "StoragesHistory has data but checkpoint is 0, clearing all"
260 );
261 self.prune_storages_history_above(0)?;
262 return Ok(None);
263 }
264
265 let mut max_highest_block = 0u64;
268 for result in self.iter::<tables::StoragesHistory>()? {
269 let (key, _) = result?;
270 let highest = key.sharded_key.highest_block_number;
271 if highest != u64::MAX && highest > max_highest_block {
272 max_highest_block = highest;
273 }
274 }
275
276 if max_highest_block > checkpoint {
278 tracing::info!(
279 target: "reth::providers::rocksdb",
280 rocks_highest = max_highest_block,
281 checkpoint,
282 "StoragesHistory ahead of checkpoint, pruning excess data"
283 );
284 self.prune_storages_history_above(checkpoint)?;
285 } else if max_highest_block < checkpoint {
286 tracing::warn!(
288 target: "reth::providers::rocksdb",
289 rocks_highest = max_highest_block,
290 checkpoint,
291 "StoragesHistory behind checkpoint, unwind needed"
292 );
293 return Ok(Some(max_highest_block));
294 }
295
296 Ok(None)
297 }
298 None => {
299 if checkpoint > 0 {
301 return Ok(Some(0));
303 }
304 Ok(None)
305 }
306 }
307 }
308
309 fn prune_storages_history_above(&self, max_block: BlockNumber) -> ProviderResult<()> {
317 use reth_db_api::models::storage_sharded_key::StorageShardedKey;
318
319 let mut to_delete: Vec<StorageShardedKey> = Vec::new();
320 for result in self.iter::<tables::StoragesHistory>()? {
321 let (key, _) = result?;
322 let highest_block = key.sharded_key.highest_block_number;
323 if max_block == 0 || (highest_block != u64::MAX && highest_block > max_block) {
324 to_delete.push(key);
325 }
326 }
327
328 let deleted = to_delete.len();
329 if deleted > 0 {
330 tracing::info!(
331 target: "reth::providers::rocksdb",
332 deleted_count = deleted,
333 max_block,
334 "Pruning StoragesHistory entries"
335 );
336
337 let mut batch = self.batch();
338 for key in to_delete {
339 batch.delete::<tables::StoragesHistory>(key)?;
340 }
341 batch.commit()?;
342 }
343
344 Ok(())
345 }
346
347 fn check_accounts_history<Provider>(
352 &self,
353 provider: &Provider,
354 ) -> ProviderResult<Option<BlockNumber>>
355 where
356 Provider: DBProvider + StageCheckpointReader,
357 {
358 let checkpoint = provider
360 .get_stage_checkpoint(StageId::IndexAccountHistory)?
361 .map(|cp| cp.block_number)
362 .unwrap_or(0);
363
364 let rocks_first = self.first::<tables::AccountsHistory>()?;
366
367 match rocks_first {
368 Some(_) => {
369 if checkpoint == 0 {
371 tracing::info!(
372 target: "reth::providers::rocksdb",
373 "AccountsHistory has data but checkpoint is 0, clearing all"
374 );
375 self.prune_accounts_history_above(0)?;
376 return Ok(None);
377 }
378
379 let mut max_highest_block = 0u64;
382 for result in self.iter::<tables::AccountsHistory>()? {
383 let (key, _) = result?;
384 let highest = key.highest_block_number;
385 if highest != u64::MAX && highest > max_highest_block {
386 max_highest_block = highest;
387 }
388 }
389
390 if max_highest_block > checkpoint {
392 tracing::info!(
393 target: "reth::providers::rocksdb",
394 rocks_highest = max_highest_block,
395 checkpoint,
396 "AccountsHistory ahead of checkpoint, pruning excess data"
397 );
398 self.prune_accounts_history_above(checkpoint)?;
399 return Ok(None);
400 }
401
402 if max_highest_block < checkpoint {
404 tracing::warn!(
405 target: "reth::providers::rocksdb",
406 rocks_highest = max_highest_block,
407 checkpoint,
408 "AccountsHistory behind checkpoint, unwind needed"
409 );
410 return Ok(Some(max_highest_block));
411 }
412
413 Ok(None)
414 }
415 None => {
416 if checkpoint > 0 {
418 return Ok(Some(0));
420 }
421 Ok(None)
422 }
423 }
424 }
425
426 fn prune_accounts_history_above(&self, max_block: BlockNumber) -> ProviderResult<()> {
435 use alloy_primitives::Address;
436 use reth_db_api::models::ShardedKey;
437
438 let mut to_delete: Vec<ShardedKey<Address>> = Vec::new();
439 for result in self.iter::<tables::AccountsHistory>()? {
440 let (key, _) = result?;
441 let highest_block = key.highest_block_number;
442 if max_block == 0 || (highest_block != u64::MAX && highest_block > max_block) {
443 to_delete.push(key);
444 }
445 }
446
447 let deleted = to_delete.len();
448 if deleted > 0 {
449 tracing::info!(
450 target: "reth::providers::rocksdb",
451 deleted_count = deleted,
452 max_block,
453 "Pruning AccountsHistory entries"
454 );
455
456 let mut batch = self.batch();
457 for key in to_delete {
458 batch.delete::<tables::AccountsHistory>(key)?;
459 }
460 batch.commit()?;
461 }
462
463 Ok(())
464 }
465}
466
467#[cfg(test)]
468mod tests {
469 use super::*;
470 use crate::{
471 providers::rocksdb::RocksDBBuilder, test_utils::create_test_provider_factory, BlockWriter,
472 DatabaseProviderFactory, StageCheckpointWriter, TransactionsProvider,
473 };
474 use alloy_primitives::{Address, B256};
475 use reth_db::cursor::DbCursorRW;
476 use reth_db_api::{
477 models::{storage_sharded_key::StorageShardedKey, StorageSettings},
478 tables::{self, BlockNumberList},
479 transaction::DbTxMut,
480 };
481 use reth_stages_types::StageCheckpoint;
482 use reth_testing_utils::generators::{self, BlockRangeParams};
483 use tempfile::TempDir;
484
485 #[test]
486 fn test_first_last_empty_rocksdb() {
487 let temp_dir = TempDir::new().unwrap();
488 let provider = RocksDBBuilder::new(temp_dir.path())
489 .with_table::<tables::TransactionHashNumbers>()
490 .with_table::<tables::StoragesHistory>()
491 .build()
492 .unwrap();
493
494 let first = provider.first::<tables::TransactionHashNumbers>().unwrap();
496 let last = provider.last::<tables::TransactionHashNumbers>().unwrap();
497
498 assert!(first.is_none());
499 assert!(last.is_none());
500 }
501
502 #[test]
503 fn test_first_last_with_data() {
504 let temp_dir = TempDir::new().unwrap();
505 let provider = RocksDBBuilder::new(temp_dir.path())
506 .with_table::<tables::TransactionHashNumbers>()
507 .build()
508 .unwrap();
509
510 let tx_hash = B256::from([1u8; 32]);
512 provider.put::<tables::TransactionHashNumbers>(tx_hash, &100).unwrap();
513
514 let last = provider.last::<tables::TransactionHashNumbers>().unwrap();
516 assert!(last.is_some());
517 assert_eq!(last.unwrap().1, 100);
518 }
519
520 #[test]
521 fn test_check_consistency_empty_rocksdb_no_checkpoint_is_ok() {
522 let temp_dir = TempDir::new().unwrap();
523 let rocksdb = RocksDBBuilder::new(temp_dir.path())
524 .with_table::<tables::TransactionHashNumbers>()
525 .with_table::<tables::StoragesHistory>()
526 .build()
527 .unwrap();
528
529 let factory = create_test_provider_factory();
531 factory.set_storage_settings_cache(
532 StorageSettings::legacy()
533 .with_transaction_hash_numbers_in_rocksdb(true)
534 .with_storages_history_in_rocksdb(true),
535 );
536
537 let provider = factory.database_provider_ro().unwrap();
538
539 let result = rocksdb.check_consistency(&provider).unwrap();
541 assert_eq!(result, None);
542 }
543
544 #[test]
545 fn test_check_consistency_empty_rocksdb_with_checkpoint_needs_unwind() {
546 let temp_dir = TempDir::new().unwrap();
547 let rocksdb = RocksDBBuilder::new(temp_dir.path())
548 .with_table::<tables::TransactionHashNumbers>()
549 .build()
550 .unwrap();
551
552 let factory = create_test_provider_factory();
554 factory.set_storage_settings_cache(
555 StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
556 );
557
558 {
560 let provider = factory.database_provider_rw().unwrap();
561 provider
562 .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(100))
563 .unwrap();
564 provider.commit().unwrap();
565 }
566
567 let provider = factory.database_provider_ro().unwrap();
568
569 let result = rocksdb.check_consistency(&provider).unwrap();
572 assert_eq!(result, Some(0), "Should require unwind to block 0 to rebuild RocksDB");
573 }
574
575 #[test]
576 fn test_check_consistency_mdbx_empty_static_files_have_data_prunes_rocksdb() {
577 let temp_dir = TempDir::new().unwrap();
578 let rocksdb = RocksDBBuilder::new(temp_dir.path())
579 .with_table::<tables::TransactionHashNumbers>()
580 .build()
581 .unwrap();
582
583 let factory = create_test_provider_factory();
584 factory.set_storage_settings_cache(
585 StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
586 );
587
588 let mut rng = generators::rng();
590 let blocks = generators::random_block_range(
591 &mut rng,
592 0..=2,
593 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
594 );
595
596 let mut tx_hashes = Vec::new();
597 {
598 let provider = factory.database_provider_rw().unwrap();
599 let mut tx_count = 0u64;
600 for block in &blocks {
601 provider
602 .insert_block(&block.clone().try_recover().expect("recover block"))
603 .unwrap();
604 for tx in &block.body().transactions {
605 let hash = tx.trie_hash();
606 tx_hashes.push(hash);
607 rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
608 tx_count += 1;
609 }
610 }
611 provider.commit().unwrap();
612 }
613
614 {
617 let provider = factory.database_provider_rw().unwrap();
618 let mut cursor = provider.tx_ref().cursor_write::<tables::TransactionBlocks>().unwrap();
619 let mut to_delete = Vec::new();
620 let mut walker = cursor.walk(Some(0)).unwrap();
621 while let Some((tx_num, _)) = walker.next().transpose().unwrap() {
622 to_delete.push(tx_num);
623 }
624 drop(walker);
625 for tx_num in to_delete {
626 cursor.seek_exact(tx_num).unwrap();
627 cursor.delete_current().unwrap();
628 }
629 provider.commit().unwrap();
631 }
632
633 assert!(rocksdb.last::<tables::TransactionHashNumbers>().unwrap().is_some());
635
636 let provider = factory.database_provider_ro().unwrap();
637
638 let result = rocksdb.check_consistency(&provider).unwrap();
641 assert_eq!(result, None, "Should heal by pruning, no unwind needed");
642
643 for hash in &tx_hashes {
645 assert!(
646 rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap().is_none(),
647 "RocksDB should be empty after pruning"
648 );
649 }
650 }
651
652 #[test]
653 fn test_check_consistency_storages_history_empty_with_checkpoint_needs_unwind() {
654 let temp_dir = TempDir::new().unwrap();
655 let rocksdb = RocksDBBuilder::new(temp_dir.path())
656 .with_table::<tables::StoragesHistory>()
657 .build()
658 .unwrap();
659
660 let factory = create_test_provider_factory();
662 factory.set_storage_settings_cache(
663 StorageSettings::legacy().with_storages_history_in_rocksdb(true),
664 );
665
666 {
668 let provider = factory.database_provider_rw().unwrap();
669 provider
670 .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
671 .unwrap();
672 provider.commit().unwrap();
673 }
674
675 let provider = factory.database_provider_ro().unwrap();
676
677 let result = rocksdb.check_consistency(&provider).unwrap();
679 assert_eq!(result, Some(0), "Should require unwind to block 0 to rebuild StoragesHistory");
680 }
681
682 #[test]
683 fn test_check_consistency_storages_history_has_data_no_checkpoint_prunes_data() {
684 let temp_dir = TempDir::new().unwrap();
685 let rocksdb = RocksDBBuilder::new(temp_dir.path())
686 .with_table::<tables::StoragesHistory>()
687 .build()
688 .unwrap();
689
690 let key = StorageShardedKey::new(Address::ZERO, B256::ZERO, 50);
692 let block_list = BlockNumberList::new_pre_sorted([10, 20, 30, 50]);
693 rocksdb.put::<tables::StoragesHistory>(key, &block_list).unwrap();
694
695 assert!(rocksdb.last::<tables::StoragesHistory>().unwrap().is_some());
697
698 let factory = create_test_provider_factory();
700 factory.set_storage_settings_cache(
701 StorageSettings::legacy().with_storages_history_in_rocksdb(true),
702 );
703
704 let provider = factory.database_provider_ro().unwrap();
705
706 let result = rocksdb.check_consistency(&provider).unwrap();
709 assert_eq!(result, None, "Should heal by pruning, no unwind needed");
710
711 assert!(
713 rocksdb.last::<tables::StoragesHistory>().unwrap().is_none(),
714 "RocksDB should be empty after pruning"
715 );
716 }
717
718 #[test]
719 fn test_check_consistency_storages_history_behind_checkpoint_needs_unwind() {
720 let temp_dir = TempDir::new().unwrap();
721 let rocksdb = RocksDBBuilder::new(temp_dir.path())
722 .with_table::<tables::StoragesHistory>()
723 .build()
724 .unwrap();
725
726 let key_block_50 = StorageShardedKey::new(Address::ZERO, B256::ZERO, 50);
728 let key_block_80 = StorageShardedKey::new(Address::ZERO, B256::from([1u8; 32]), 80);
729 let key_block_max = StorageShardedKey::new(Address::ZERO, B256::from([2u8; 32]), u64::MAX);
730
731 let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
732 rocksdb.put::<tables::StoragesHistory>(key_block_50, &block_list).unwrap();
733 rocksdb.put::<tables::StoragesHistory>(key_block_80, &block_list).unwrap();
734 rocksdb.put::<tables::StoragesHistory>(key_block_max, &block_list).unwrap();
735
736 let factory = create_test_provider_factory();
738 factory.set_storage_settings_cache(
739 StorageSettings::legacy().with_storages_history_in_rocksdb(true),
740 );
741
742 {
744 let provider = factory.database_provider_rw().unwrap();
745 provider
746 .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
747 .unwrap();
748 provider.commit().unwrap();
749 }
750
751 let provider = factory.database_provider_ro().unwrap();
752
753 let result = rocksdb.check_consistency(&provider).unwrap();
755 assert_eq!(result, Some(80), "Should unwind to the highest block present in RocksDB");
756 }
757
758 #[test]
759 fn test_check_consistency_mdbx_behind_checkpoint_needs_unwind() {
760 let temp_dir = TempDir::new().unwrap();
761 let rocksdb = RocksDBBuilder::new(temp_dir.path())
762 .with_table::<tables::TransactionHashNumbers>()
763 .build()
764 .unwrap();
765
766 let factory = create_test_provider_factory();
767 factory.set_storage_settings_cache(
768 StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
769 );
770
771 let mut rng = generators::rng();
773 let blocks = generators::random_block_range(
774 &mut rng,
775 0..=2,
776 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
777 );
778
779 {
780 let provider = factory.database_provider_rw().unwrap();
781 let mut tx_count = 0u64;
782 for block in &blocks {
783 provider
784 .insert_block(&block.clone().try_recover().expect("recover block"))
785 .unwrap();
786 for tx in &block.body().transactions {
787 let hash = tx.trie_hash();
788 rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
789 tx_count += 1;
790 }
791 }
792 provider.commit().unwrap();
793 }
794
795 {
799 let provider = factory.database_provider_rw().unwrap();
800 provider
801 .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(10))
802 .unwrap();
803 provider.commit().unwrap();
804 }
805
806 let provider = factory.database_provider_ro().unwrap();
807
808 let result = rocksdb.check_consistency(&provider).unwrap();
812 assert_eq!(
813 result,
814 Some(2),
815 "Should require unwind to block 2 (MDBX's last block) to rebuild from checkpoint"
816 );
817 }
818
819 #[test]
820 fn test_check_consistency_rocksdb_ahead_of_checkpoint_prunes_excess() {
821 let temp_dir = TempDir::new().unwrap();
822 let rocksdb = RocksDBBuilder::new(temp_dir.path())
823 .with_table::<tables::TransactionHashNumbers>()
824 .build()
825 .unwrap();
826
827 let factory = create_test_provider_factory();
829 factory.set_storage_settings_cache(
830 StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
831 );
832
833 let mut rng = generators::rng();
836 let blocks = generators::random_block_range(
837 &mut rng,
838 0..=5,
839 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
840 );
841
842 let mut tx_hashes = Vec::new();
844 let mut tx_count = 0u64;
845 {
846 let provider = factory.database_provider_rw().unwrap();
847 for block in &blocks {
849 provider
850 .insert_block(&block.clone().try_recover().expect("recover block"))
851 .unwrap();
852 for tx in &block.body().transactions {
853 let hash = tx.trie_hash();
854 tx_hashes.push(hash);
855 rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
856 tx_count += 1;
857 }
858 }
859 provider.commit().unwrap();
860 }
861
862 {
866 let provider = factory.database_provider_rw().unwrap();
867 let mut cursor = provider.tx_ref().cursor_write::<tables::TransactionBlocks>().unwrap();
871 let mut to_delete = Vec::new();
873 let mut walker = cursor.walk(Some(0)).unwrap();
874 while let Some((tx_num, block_num)) = walker.next().transpose().unwrap() {
875 if block_num > 2 {
876 to_delete.push(tx_num);
877 }
878 }
879 drop(walker);
880 for tx_num in to_delete {
881 cursor.seek_exact(tx_num).unwrap();
882 cursor.delete_current().unwrap();
883 }
884
885 provider
887 .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(2))
888 .unwrap();
889 provider.commit().unwrap();
890 }
891
892 let provider = factory.database_provider_ro().unwrap();
893
894 let result = rocksdb.check_consistency(&provider).unwrap();
899 assert_eq!(result, None, "Should heal by pruning, no unwind needed");
900
901 for (i, hash) in tx_hashes.iter().take(6).enumerate() {
904 assert!(
905 rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap().is_some(),
906 "tx {} should remain",
907 i
908 );
909 }
910 for (i, hash) in tx_hashes.iter().skip(6).enumerate() {
912 assert!(
913 rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap().is_none(),
914 "tx {} should be pruned",
915 i + 6
916 );
917 }
918 }
919
920 #[test]
921 fn test_check_consistency_storages_history_ahead_of_checkpoint_prunes_excess() {
922 let temp_dir = TempDir::new().unwrap();
923 let rocksdb = RocksDBBuilder::new(temp_dir.path())
924 .with_table::<tables::StoragesHistory>()
925 .build()
926 .unwrap();
927
928 let key_block_50 = StorageShardedKey::new(Address::ZERO, B256::ZERO, 50);
930 let key_block_100 = StorageShardedKey::new(Address::ZERO, B256::from([1u8; 32]), 100);
931 let key_block_150 = StorageShardedKey::new(Address::ZERO, B256::from([2u8; 32]), 150);
932 let key_block_max = StorageShardedKey::new(Address::ZERO, B256::from([3u8; 32]), u64::MAX);
933
934 let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
935 rocksdb.put::<tables::StoragesHistory>(key_block_50.clone(), &block_list).unwrap();
936 rocksdb.put::<tables::StoragesHistory>(key_block_100.clone(), &block_list).unwrap();
937 rocksdb.put::<tables::StoragesHistory>(key_block_150.clone(), &block_list).unwrap();
938 rocksdb.put::<tables::StoragesHistory>(key_block_max.clone(), &block_list).unwrap();
939
940 let factory = create_test_provider_factory();
942 factory.set_storage_settings_cache(
943 StorageSettings::legacy().with_storages_history_in_rocksdb(true),
944 );
945
946 {
948 let provider = factory.database_provider_rw().unwrap();
949 provider
950 .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
951 .unwrap();
952 provider.commit().unwrap();
953 }
954
955 let provider = factory.database_provider_ro().unwrap();
956
957 let result = rocksdb.check_consistency(&provider).unwrap();
960 assert_eq!(result, None, "Should heal by pruning, no unwind needed");
961
962 assert!(
964 rocksdb.get::<tables::StoragesHistory>(key_block_50).unwrap().is_some(),
965 "Entry with highest_block=50 should remain"
966 );
967 assert!(
968 rocksdb.get::<tables::StoragesHistory>(key_block_100).unwrap().is_some(),
969 "Entry with highest_block=100 should remain"
970 );
971 assert!(
972 rocksdb.get::<tables::StoragesHistory>(key_block_150).unwrap().is_none(),
973 "Entry with highest_block=150 should be pruned"
974 );
975 assert!(
976 rocksdb.get::<tables::StoragesHistory>(key_block_max).unwrap().is_some(),
977 "Entry with highest_block=u64::MAX (sentinel) should remain"
978 );
979 }
980
981 #[test]
982 fn test_check_consistency_storages_history_behind_checkpoint_single_entry() {
983 use reth_db_api::models::storage_sharded_key::StorageShardedKey;
984
985 let temp_dir = TempDir::new().unwrap();
986 let rocksdb = RocksDBBuilder::new(temp_dir.path())
987 .with_table::<tables::StoragesHistory>()
988 .build()
989 .unwrap();
990
991 let key_block_50 = StorageShardedKey::new(Address::ZERO, B256::ZERO, 50);
993 let block_list = BlockNumberList::new_pre_sorted([10, 20, 30, 50]);
994 rocksdb.put::<tables::StoragesHistory>(key_block_50, &block_list).unwrap();
995
996 let factory = create_test_provider_factory();
997 factory.set_storage_settings_cache(
998 StorageSettings::legacy().with_storages_history_in_rocksdb(true),
999 );
1000
1001 {
1003 let provider = factory.database_provider_rw().unwrap();
1004 provider
1005 .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
1006 .unwrap();
1007 provider.commit().unwrap();
1008 }
1009
1010 let provider = factory.database_provider_ro().unwrap();
1011
1012 let result = rocksdb.check_consistency(&provider).unwrap();
1014 assert_eq!(
1015 result,
1016 Some(50),
1017 "Should require unwind to block 50 to rebuild StoragesHistory"
1018 );
1019 }
1020
1021 #[test]
1025 fn test_prune_transaction_hash_numbers_by_range() {
1026 let temp_dir = TempDir::new().unwrap();
1027 let rocksdb = RocksDBBuilder::new(temp_dir.path())
1028 .with_table::<tables::TransactionHashNumbers>()
1029 .build()
1030 .unwrap();
1031
1032 let factory = create_test_provider_factory();
1034 factory.set_storage_settings_cache(
1035 StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
1036 );
1037
1038 let mut rng = generators::rng();
1042 let blocks = generators::random_block_range(
1043 &mut rng,
1044 0..=5,
1045 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
1046 );
1047
1048 let mut tx_count = 0u64;
1050 let mut tx_hashes = Vec::new();
1051 {
1052 let provider = factory.database_provider_rw().unwrap();
1053
1054 for block in &blocks {
1055 provider
1056 .insert_block(&block.clone().try_recover().expect("recover block"))
1057 .unwrap();
1058
1059 for tx in &block.body().transactions {
1061 let hash = tx.trie_hash();
1062 tx_hashes.push(hash);
1063 rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
1064 tx_count += 1;
1065 }
1066 }
1067
1068 provider
1071 .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(2))
1072 .unwrap();
1073 provider.commit().unwrap();
1074 }
1075
1076 let rocksdb_count_before: usize =
1083 rocksdb.iter::<tables::TransactionHashNumbers>().unwrap().count();
1084 assert_eq!(
1085 rocksdb_count_before, tx_count as usize,
1086 "RocksDB should have all {} transaction hashes before pruning",
1087 tx_count
1088 );
1089
1090 let provider = factory.database_provider_ro().unwrap();
1091
1092 let all_txs = provider.transactions_by_tx_range(0..tx_count).unwrap();
1094 assert_eq!(all_txs.len(), tx_count as usize, "Should be able to fetch all transactions");
1095
1096 for (i, tx) in all_txs.iter().enumerate() {
1098 let computed_hash = tx.trie_hash();
1099 assert_eq!(
1100 computed_hash, tx_hashes[i],
1101 "Hash mismatch for tx {}: stored {:?} vs computed {:?}",
1102 i, tx_hashes[i], computed_hash
1103 );
1104 }
1105
1106 let max_tx_to_keep = 5u64;
1109 let tx_to_prune_start = max_tx_to_keep + 1;
1110
1111 rocksdb
1113 .prune_transaction_hash_numbers_in_range(&provider, tx_to_prune_start..=(tx_count - 1))
1114 .expect("prune should succeed");
1115
1116 let mut remaining_count = 0;
1118 for result in rocksdb.iter::<tables::TransactionHashNumbers>().unwrap() {
1119 let (_hash, tx_num) = result.unwrap();
1120 assert!(
1121 tx_num <= max_tx_to_keep,
1122 "Transaction {} should have been pruned (> {})",
1123 tx_num,
1124 max_tx_to_keep
1125 );
1126 remaining_count += 1;
1127 }
1128 assert_eq!(
1129 remaining_count,
1130 (max_tx_to_keep + 1) as usize,
1131 "Should have {} transactions (0-{})",
1132 max_tx_to_keep + 1,
1133 max_tx_to_keep
1134 );
1135 }
1136
1137 #[test]
1138 fn test_check_consistency_accounts_history_empty_with_checkpoint_needs_unwind() {
1139 let temp_dir = TempDir::new().unwrap();
1140 let rocksdb = RocksDBBuilder::new(temp_dir.path())
1141 .with_table::<tables::AccountsHistory>()
1142 .build()
1143 .unwrap();
1144
1145 let factory = create_test_provider_factory();
1147 factory.set_storage_settings_cache(
1148 StorageSettings::legacy().with_account_history_in_rocksdb(true),
1149 );
1150
1151 {
1153 let provider = factory.database_provider_rw().unwrap();
1154 provider
1155 .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
1156 .unwrap();
1157 provider.commit().unwrap();
1158 }
1159
1160 let provider = factory.database_provider_ro().unwrap();
1161
1162 let result = rocksdb.check_consistency(&provider).unwrap();
1164 assert_eq!(result, Some(0), "Should require unwind to block 0 to rebuild AccountsHistory");
1165 }
1166
1167 #[test]
1168 fn test_check_consistency_accounts_history_has_data_no_checkpoint_prunes_data() {
1169 use reth_db_api::models::ShardedKey;
1170
1171 let temp_dir = TempDir::new().unwrap();
1172 let rocksdb = RocksDBBuilder::new(temp_dir.path())
1173 .with_table::<tables::AccountsHistory>()
1174 .build()
1175 .unwrap();
1176
1177 let key = ShardedKey::new(Address::ZERO, 50);
1179 let block_list = BlockNumberList::new_pre_sorted([10, 20, 30, 50]);
1180 rocksdb.put::<tables::AccountsHistory>(key, &block_list).unwrap();
1181
1182 assert!(rocksdb.last::<tables::AccountsHistory>().unwrap().is_some());
1184
1185 let factory = create_test_provider_factory();
1187 factory.set_storage_settings_cache(
1188 StorageSettings::legacy().with_account_history_in_rocksdb(true),
1189 );
1190
1191 let provider = factory.database_provider_ro().unwrap();
1192
1193 let result = rocksdb.check_consistency(&provider).unwrap();
1196 assert_eq!(result, None, "Should heal by pruning, no unwind needed");
1197
1198 assert!(
1200 rocksdb.last::<tables::AccountsHistory>().unwrap().is_none(),
1201 "RocksDB should be empty after pruning"
1202 );
1203 }
1204
1205 #[test]
1206 fn test_check_consistency_accounts_history_ahead_of_checkpoint_prunes_excess() {
1207 use reth_db_api::models::ShardedKey;
1208
1209 let temp_dir = TempDir::new().unwrap();
1210 let rocksdb = RocksDBBuilder::new(temp_dir.path())
1211 .with_table::<tables::AccountsHistory>()
1212 .build()
1213 .unwrap();
1214
1215 let key_block_50 = ShardedKey::new(Address::ZERO, 50);
1217 let key_block_100 = ShardedKey::new(Address::random(), 100);
1218 let key_block_150 = ShardedKey::new(Address::random(), 150);
1219 let key_block_max = ShardedKey::new(Address::random(), u64::MAX);
1220
1221 let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
1222 rocksdb.put::<tables::AccountsHistory>(key_block_50.clone(), &block_list).unwrap();
1223 rocksdb.put::<tables::AccountsHistory>(key_block_100.clone(), &block_list).unwrap();
1224 rocksdb.put::<tables::AccountsHistory>(key_block_150.clone(), &block_list).unwrap();
1225 rocksdb.put::<tables::AccountsHistory>(key_block_max.clone(), &block_list).unwrap();
1226
1227 let factory = create_test_provider_factory();
1229 factory.set_storage_settings_cache(
1230 StorageSettings::legacy().with_account_history_in_rocksdb(true),
1231 );
1232
1233 {
1235 let provider = factory.database_provider_rw().unwrap();
1236 provider
1237 .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
1238 .unwrap();
1239 provider.commit().unwrap();
1240 }
1241
1242 let provider = factory.database_provider_ro().unwrap();
1243
1244 let result = rocksdb.check_consistency(&provider).unwrap();
1247 assert_eq!(result, None, "Should heal by pruning, no unwind needed");
1248
1249 assert!(
1251 rocksdb.get::<tables::AccountsHistory>(key_block_50).unwrap().is_some(),
1252 "Entry with highest_block=50 should remain"
1253 );
1254 assert!(
1255 rocksdb.get::<tables::AccountsHistory>(key_block_100).unwrap().is_some(),
1256 "Entry with highest_block=100 should remain"
1257 );
1258 assert!(
1259 rocksdb.get::<tables::AccountsHistory>(key_block_150).unwrap().is_none(),
1260 "Entry with highest_block=150 should be pruned"
1261 );
1262 assert!(
1263 rocksdb.get::<tables::AccountsHistory>(key_block_max).unwrap().is_some(),
1264 "Entry with highest_block=u64::MAX (sentinel) should remain"
1265 );
1266 }
1267
1268 #[test]
1269 fn test_check_consistency_accounts_history_behind_checkpoint_needs_unwind() {
1270 use reth_db_api::models::ShardedKey;
1271
1272 let temp_dir = TempDir::new().unwrap();
1273 let rocksdb = RocksDBBuilder::new(temp_dir.path())
1274 .with_table::<tables::AccountsHistory>()
1275 .build()
1276 .unwrap();
1277
1278 let key_block_50 = ShardedKey::new(Address::ZERO, 50);
1280 let block_list = BlockNumberList::new_pre_sorted([10, 20, 30, 50]);
1281 rocksdb.put::<tables::AccountsHistory>(key_block_50, &block_list).unwrap();
1282
1283 let factory = create_test_provider_factory();
1284 factory.set_storage_settings_cache(
1285 StorageSettings::legacy().with_account_history_in_rocksdb(true),
1286 );
1287
1288 {
1290 let provider = factory.database_provider_rw().unwrap();
1291 provider
1292 .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
1293 .unwrap();
1294 provider.commit().unwrap();
1295 }
1296
1297 let provider = factory.database_provider_ro().unwrap();
1298
1299 let result = rocksdb.check_consistency(&provider).unwrap();
1301 assert_eq!(
1302 result,
1303 Some(50),
1304 "Should require unwind to block 50 to rebuild AccountsHistory"
1305 );
1306 }
1307}