1use super::RocksDBProvider;
8use crate::StaticFileProviderFactory;
9use alloy_eips::eip2718::Encodable2718;
10use alloy_primitives::BlockNumber;
11use rayon::prelude::*;
12use reth_chainspec::{ChainSpecProvider, EthChainSpec};
13use reth_db::models::{storage_sharded_key::StorageShardedKey, ShardedKey};
14use reth_db_api::tables;
15use reth_stages_types::StageId;
16use reth_static_file_types::StaticFileSegment;
17use reth_storage_api::{
18 BlockBodyIndicesProvider, ChangeSetReader, DBProvider, StageCheckpointReader,
19 StorageChangeSetReader, StorageSettingsCache, TransactionsProvider,
20};
21use reth_storage_errors::provider::ProviderResult;
22use std::collections::HashSet;
23
24const HEAL_HISTORY_BATCH_SIZE: u64 = 10_000;
27
28impl RocksDBProvider {
29 pub fn check_consistency<Provider>(
51 &self,
52 provider: &Provider,
53 ) -> ProviderResult<Option<BlockNumber>>
54 where
55 Provider: DBProvider
56 + StageCheckpointReader
57 + StorageSettingsCache
58 + StaticFileProviderFactory
59 + BlockBodyIndicesProvider
60 + StorageChangeSetReader
61 + ChangeSetReader
62 + TransactionsProvider<Transaction: Encodable2718>
63 + ChainSpecProvider,
64 {
65 let mut unwind_target: Option<BlockNumber> = None;
66
67 if provider.cached_storage_settings().storage_v2 &&
69 let Some(target) = self.heal_transaction_hash_numbers(provider)?
70 {
71 unwind_target = Some(unwind_target.map_or(target, |t| t.min(target)));
72 }
73
74 if provider.cached_storage_settings().storage_v2 &&
76 let Some(target) = self.heal_storages_history(provider)?
77 {
78 unwind_target = Some(unwind_target.map_or(target, |t| t.min(target)));
79 }
80
81 if provider.cached_storage_settings().storage_v2 &&
83 let Some(target) = self.heal_accounts_history(provider)?
84 {
85 unwind_target = Some(unwind_target.map_or(target, |t| t.min(target)));
86 }
87
88 Ok(unwind_target)
89 }
90
91 fn heal_transaction_hash_numbers<Provider>(
98 &self,
99 provider: &Provider,
100 ) -> ProviderResult<Option<BlockNumber>>
101 where
102 Provider: DBProvider
103 + StageCheckpointReader
104 + StaticFileProviderFactory
105 + BlockBodyIndicesProvider
106 + TransactionsProvider<Transaction: Encodable2718>,
107 {
108 let checkpoint = provider
109 .get_stage_checkpoint(StageId::TransactionLookup)?
110 .map(|cp| cp.block_number)
111 .unwrap_or(0);
112
113 let sf_tip = provider
114 .static_file_provider()
115 .get_highest_static_file_block(StaticFileSegment::Transactions)
116 .unwrap_or(0);
117
118 if checkpoint == 0 {
120 if self.first::<tables::TransactionHashNumbers>()?.is_some() {
121 tracing::info!(
122 target: "reth::providers::rocksdb",
123 "TransactionHashNumbers: checkpoint is 0, clearing stale data"
124 );
125 self.clear::<tables::TransactionHashNumbers>()?;
126 }
127
128 return Ok(None);
129 }
130
131 if sf_tip < checkpoint {
132 tracing::warn!(
136 target: "reth::providers::rocksdb",
137 sf_tip,
138 checkpoint,
139 "TransactionHashNumbers: static file tip behind checkpoint, unwind needed"
140 );
141 return Ok(Some(sf_tip));
142 }
143
144 if sf_tip == checkpoint {
146 return Ok(None);
147 }
148
149 let sf_tip_end_tx = provider
151 .static_file_provider()
152 .get_highest_static_file_tx(StaticFileSegment::Transactions)
153 .unwrap_or(0);
154
155 let checkpoint_next_tx = provider
157 .block_body_indices(checkpoint)?
158 .map(|indices| indices.next_tx_num())
159 .unwrap_or(0);
160
161 if sf_tip_end_tx < checkpoint_next_tx {
162 tracing::warn!(
166 target: "reth::providers::rocksdb",
167 sf_tip_end_tx,
168 checkpoint_next_tx,
169 checkpoint,
170 sf_tip,
171 "TransactionHashNumbers: static file tx tip behind checkpoint, unwind needed"
172 );
173 return Ok(Some(sf_tip));
174 }
175
176 tracing::info!(
177 target: "reth::providers::rocksdb",
178 checkpoint,
179 sf_tip,
180 checkpoint_next_tx,
181 sf_tip_end_tx,
182 "TransactionHashNumbers: healing via transaction ranges"
183 );
184
185 const BATCH_SIZE: u64 = 10_000;
186 let mut batch_start = checkpoint_next_tx;
187
188 while batch_start <= sf_tip_end_tx {
189 let batch_end = batch_start.saturating_add(BATCH_SIZE - 1).min(sf_tip_end_tx);
190
191 tracing::debug!(
192 target: "reth::providers::rocksdb",
193 batch_start,
194 batch_end,
195 "Pruning TransactionHashNumbers batch"
196 );
197
198 self.prune_transaction_hash_numbers_in_range(provider, batch_start..=batch_end)?;
199
200 batch_start = batch_end.saturating_add(1);
201 }
202
203 Ok(None)
204 }
205
206 fn prune_transaction_hash_numbers_in_range<Provider>(
219 &self,
220 provider: &Provider,
221 tx_range: std::ops::RangeInclusive<u64>,
222 ) -> ProviderResult<()>
223 where
224 Provider: TransactionsProvider<Transaction: Encodable2718>,
225 {
226 if tx_range.is_empty() {
227 return Ok(());
228 }
229
230 let hashes: Vec<_> = provider
232 .transactions_by_tx_range(tx_range.clone())?
233 .into_par_iter()
234 .map(|tx| tx.trie_hash())
235 .collect();
236
237 if !hashes.is_empty() {
238 tracing::info!(
239 target: "reth::providers::rocksdb",
240 deleted_count = hashes.len(),
241 tx_range_start = *tx_range.start(),
242 tx_range_end = *tx_range.end(),
243 "Pruning TransactionHashNumbers entries by tx range"
244 );
245
246 let mut batch = self.batch();
247 for hash in hashes {
248 batch.delete::<tables::TransactionHashNumbers>(hash)?;
249 }
250 batch.commit()?;
251 }
252
253 Ok(())
254 }
255
256 fn heal_storages_history<Provider>(
261 &self,
262 provider: &Provider,
263 ) -> ProviderResult<Option<BlockNumber>>
264 where
265 Provider: DBProvider
266 + StageCheckpointReader
267 + StaticFileProviderFactory
268 + StorageChangeSetReader
269 + ChainSpecProvider,
270 {
271 let checkpoint = provider
272 .get_stage_checkpoint(StageId::IndexStorageHistory)?
273 .map(|cp| cp.block_number)
274 .unwrap_or(0);
275
276 if checkpoint == 0 {
278 tracing::info!(
279 target: "reth::providers::rocksdb",
280 "StoragesHistory: checkpoint is 0, clearing stale data"
281 );
282 self.clear::<tables::StoragesHistory>()?;
283
284 let chain_spec = provider.chain_spec();
285 let genesis = chain_spec.genesis();
286 let list = tables::BlockNumberList::new([0]).expect("single block always fits");
287 for (addr, account) in &genesis.alloc {
288 if let Some(storage) = &account.storage {
289 for key in storage.keys() {
290 self.put::<tables::StoragesHistory>(
291 StorageShardedKey::last(*addr, *key),
292 &list,
293 )?;
294 }
295 }
296 }
297
298 return Ok(None);
299 }
300
301 let sf_tip = provider
302 .static_file_provider()
303 .get_highest_static_file_block(StaticFileSegment::StorageChangeSets)
304 .unwrap_or(0);
305
306 if sf_tip < checkpoint {
307 tracing::warn!(
311 target: "reth::providers::rocksdb",
312 sf_tip,
313 checkpoint,
314 "StoragesHistory: static file tip behind checkpoint, unwind needed"
315 );
316 return Ok(Some(sf_tip));
317 }
318
319 if sf_tip == checkpoint {
320 return Ok(None);
321 }
322
323 let total_blocks = sf_tip - checkpoint;
324 tracing::info!(
325 target: "reth::providers::rocksdb",
326 checkpoint,
327 sf_tip,
328 total_blocks,
329 "StoragesHistory: healing via changesets"
330 );
331
332 let mut batch_start = checkpoint + 1;
333 let mut batch_num = 0u64;
334 let total_batches = total_blocks.div_ceil(HEAL_HISTORY_BATCH_SIZE);
335
336 while batch_start <= sf_tip {
337 let batch_end = (batch_start + HEAL_HISTORY_BATCH_SIZE - 1).min(sf_tip);
338 batch_num += 1;
339
340 let changesets = provider.storage_changesets_range(batch_start..=batch_end)?;
341
342 let unique_keys: HashSet<_> = changesets
343 .into_iter()
344 .map(|(block_addr, entry)| (block_addr.address(), entry.key, checkpoint + 1))
345 .collect();
346 let indices: Vec<_> = unique_keys.into_iter().collect();
347
348 if !indices.is_empty() {
349 tracing::info!(
350 target: "reth::providers::rocksdb",
351 batch_num,
352 total_batches,
353 batch_start,
354 batch_end,
355 indices_count = indices.len(),
356 "StoragesHistory: unwinding batch"
357 );
358
359 let batch = self.unwind_storage_history_indices(&indices)?;
360 self.commit_batch(batch)?;
361 }
362
363 batch_start = batch_end + 1;
364 }
365
366 Ok(None)
367 }
368
369 fn heal_accounts_history<Provider>(
374 &self,
375 provider: &Provider,
376 ) -> ProviderResult<Option<BlockNumber>>
377 where
378 Provider: DBProvider
379 + StageCheckpointReader
380 + StaticFileProviderFactory
381 + ChangeSetReader
382 + ChainSpecProvider,
383 {
384 let checkpoint = provider
385 .get_stage_checkpoint(StageId::IndexAccountHistory)?
386 .map(|cp| cp.block_number)
387 .unwrap_or(0);
388
389 if checkpoint == 0 {
391 tracing::info!(
392 target: "reth::providers::rocksdb",
393 "AccountsHistory: checkpoint is 0, clearing stale data"
394 );
395 self.clear::<tables::AccountsHistory>()?;
396
397 let chain_spec = provider.chain_spec();
398 let genesis = chain_spec.genesis();
399 let list = tables::BlockNumberList::new([0]).expect("single block always fits");
400 for addr in genesis.alloc.keys() {
401 self.put::<tables::AccountsHistory>(ShardedKey::last(*addr), &list)?;
402 }
403
404 return Ok(None);
405 }
406
407 let sf_tip = provider
408 .static_file_provider()
409 .get_highest_static_file_block(StaticFileSegment::AccountChangeSets)
410 .unwrap_or(0);
411
412 if sf_tip < checkpoint {
413 tracing::warn!(
417 target: "reth::providers::rocksdb",
418 sf_tip,
419 checkpoint,
420 "AccountsHistory: static file tip behind checkpoint, unwind needed"
421 );
422 return Ok(Some(sf_tip));
423 }
424
425 if sf_tip == checkpoint {
426 return Ok(None);
427 }
428
429 let total_blocks = sf_tip - checkpoint;
430 tracing::info!(
431 target: "reth::providers::rocksdb",
432 checkpoint,
433 sf_tip,
434 total_blocks,
435 "AccountsHistory: healing via changesets"
436 );
437
438 let mut batch_start = checkpoint + 1;
439 let mut batch_num = 0u64;
440 let total_batches = total_blocks.div_ceil(HEAL_HISTORY_BATCH_SIZE);
441
442 while batch_start <= sf_tip {
443 let batch_end = (batch_start + HEAL_HISTORY_BATCH_SIZE - 1).min(sf_tip);
444 batch_num += 1;
445
446 let changesets = provider.account_changesets_range(batch_start..=batch_end)?;
447
448 let mut addresses = HashSet::with_capacity(changesets.len());
449 addresses.extend(changesets.iter().map(|(_, cs)| cs.address));
450 let unwind_from = checkpoint + 1;
451 let indices: Vec<_> = addresses.into_iter().map(|addr| (addr, unwind_from)).collect();
452
453 if !indices.is_empty() {
454 tracing::info!(
455 target: "reth::providers::rocksdb",
456 batch_num,
457 total_batches,
458 batch_start,
459 batch_end,
460 indices_count = indices.len(),
461 "AccountsHistory: unwinding batch"
462 );
463
464 let batch = self.unwind_account_history_indices(&indices)?;
465 self.commit_batch(batch)?;
466 }
467
468 batch_start = batch_end + 1;
469 }
470
471 Ok(None)
472 }
473}
474
475#[cfg(test)]
476mod tests {
477 use std::sync::Arc;
478
479 use super::*;
480 use crate::{
481 init::insert_genesis_history,
482 providers::{rocksdb::RocksDBBuilder, static_file::StaticFileWriter},
483 test_utils::{create_test_provider_factory, create_test_provider_factory_with_chain_spec},
484 BlockWriter, DatabaseProviderFactory, RocksDBProviderFactory, StageCheckpointWriter,
485 TransactionsProvider,
486 };
487 use alloy_primitives::{Address, B256};
488 use reth_chainspec::MAINNET;
489 use reth_db::cursor::{DbCursorRO, DbCursorRW};
490 use reth_db_api::{
491 models::{storage_sharded_key::StorageShardedKey, StorageSettings},
492 tables::{self, BlockNumberList},
493 transaction::DbTxMut,
494 };
495 use reth_stages_types::StageCheckpoint;
496 use reth_testing_utils::generators::{self, BlockRangeParams};
497 use tempfile::TempDir;
498
499 #[test]
500 fn test_first_last_empty_rocksdb() {
501 let temp_dir = TempDir::new().unwrap();
502 let provider = RocksDBBuilder::new(temp_dir.path())
503 .with_table::<tables::TransactionHashNumbers>()
504 .with_table::<tables::StoragesHistory>()
505 .build()
506 .unwrap();
507
508 let first = provider.first::<tables::TransactionHashNumbers>().unwrap();
510 let last = provider.last::<tables::TransactionHashNumbers>().unwrap();
511
512 assert!(first.is_none());
513 assert!(last.is_none());
514 }
515
516 #[test]
517 fn test_first_last_with_data() {
518 let temp_dir = TempDir::new().unwrap();
519 let provider = RocksDBBuilder::new(temp_dir.path())
520 .with_table::<tables::TransactionHashNumbers>()
521 .build()
522 .unwrap();
523
524 let tx_hash = B256::from([1u8; 32]);
526 provider.put::<tables::TransactionHashNumbers>(tx_hash, &100).unwrap();
527
528 let last = provider.last::<tables::TransactionHashNumbers>().unwrap();
530 assert!(last.is_some());
531 assert_eq!(last.unwrap().1, 100);
532 }
533
534 #[test]
535 fn test_check_consistency_empty_rocksdb_no_checkpoint_is_ok() {
536 let temp_dir = TempDir::new().unwrap();
537 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
538
539 let factory = create_test_provider_factory();
541 factory.set_storage_settings_cache(StorageSettings::v2());
542
543 let provider = factory.database_provider_ro().unwrap();
544
545 let result = rocksdb.check_consistency(&provider).unwrap();
547 assert_eq!(result, None);
548 }
549
550 #[test]
555 fn test_check_consistency_checkpoint_zero_empty_rocksdb_returns_early() {
556 let temp_dir = TempDir::new().unwrap();
557 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
558
559 let factory = create_test_provider_factory();
560 factory.set_storage_settings_cache(StorageSettings::v2());
561
562 let provider = factory.database_provider_ro().unwrap();
565
566 let result = rocksdb.heal_transaction_hash_numbers(&provider).unwrap();
567 assert_eq!(result, None, "TransactionHashNumbers should return early at checkpoint 0");
568 assert!(rocksdb.first::<tables::TransactionHashNumbers>().unwrap().is_none());
569
570 let result = rocksdb.heal_storages_history(&provider).unwrap();
571 assert_eq!(result, None, "StoragesHistory should return early at checkpoint 0");
572
573 let result = rocksdb.heal_accounts_history(&provider).unwrap();
574 assert_eq!(result, None, "AccountsHistory should return early at checkpoint 0");
575 assert_eq!(
577 rocksdb.iter::<tables::AccountsHistory>().unwrap().count(),
578 factory.chain_spec().genesis().alloc.len()
579 );
580 }
581
582 #[test]
583 fn test_check_consistency_empty_rocksdb_with_checkpoint_is_first_run() {
584 let temp_dir = TempDir::new().unwrap();
585 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
586
587 let factory = create_test_provider_factory();
589 factory.set_storage_settings_cache(StorageSettings::v2());
590
591 {
593 let provider = factory.database_provider_rw().unwrap();
594 provider
595 .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(100))
596 .unwrap();
597 provider.commit().unwrap();
598 }
599
600 let provider = factory.database_provider_ro().unwrap();
601
602 let result = rocksdb.check_consistency(&provider).unwrap();
605 assert_eq!(result, Some(0), "Static file tip (0) behind checkpoint (100) triggers unwind");
606 }
607
608 #[test]
611 fn test_check_consistency_checkpoint_zero_with_rocksdb_data_prunes_all() {
612 let temp_dir = TempDir::new().unwrap();
613 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
614
615 let factory = create_test_provider_factory();
616 factory.set_storage_settings_cache(StorageSettings::v2());
617
618 let mut rng = generators::rng();
620 let blocks = generators::random_block_range(
621 &mut rng,
622 0..=2,
623 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
624 );
625
626 let mut tx_hashes = Vec::new();
627 {
628 let provider = factory.database_provider_rw().unwrap();
629 let mut tx_count = 0u64;
630 for block in &blocks {
631 provider
632 .insert_block(&block.clone().try_recover().expect("recover block"))
633 .unwrap();
634 for tx in &block.body().transactions {
635 let hash = tx.trie_hash();
636 tx_hashes.push(hash);
637 rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
638 tx_count += 1;
639 }
640 }
641 provider.commit().unwrap();
642 }
643
644 {
646 let provider = factory.database_provider_rw().unwrap();
647 provider
648 .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(0))
649 .unwrap();
650 provider
651 .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(0))
652 .unwrap();
653 provider
654 .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(0))
655 .unwrap();
656 provider.commit().unwrap();
657 }
658
659 assert!(rocksdb.last::<tables::TransactionHashNumbers>().unwrap().is_some());
661
662 let provider = factory.database_provider_ro().unwrap();
663
664 let result = rocksdb.check_consistency(&provider).unwrap();
667 assert_eq!(result, None, "Should heal by clearing, no unwind needed");
668
669 for hash in &tx_hashes {
671 assert!(
672 rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap().is_none(),
673 "RocksDB should be empty after pruning"
674 );
675 }
676 }
677
678 #[test]
679 fn test_check_consistency_storages_history_empty_with_checkpoint_is_first_run() {
680 let temp_dir = TempDir::new().unwrap();
681 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
682
683 let factory = create_test_provider_factory();
685 factory.set_storage_settings_cache(StorageSettings::v2());
686
687 {
689 let provider = factory.database_provider_rw().unwrap();
690 provider
691 .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
692 .unwrap();
693 provider.commit().unwrap();
694 }
695
696 let provider = factory.database_provider_ro().unwrap();
697
698 let result = rocksdb.check_consistency(&provider).unwrap();
702 assert_eq!(result, Some(0), "sf_tip=0 < checkpoint=100 returns unwind target");
703 }
704
705 #[test]
706 fn test_check_consistency_storages_history_preserves_genesis_entries_at_checkpoint_zero(
707 ) -> eyre::Result<()> {
708 let mut chain_spec = MAINNET.clone();
710 Arc::make_mut(&mut chain_spec).genesis.alloc.first_entry().unwrap().get_mut().storage =
711 Some(From::from([(B256::random(), B256::random())]));
712
713 let factory = create_test_provider_factory_with_chain_spec(chain_spec);
715 let rocksdb = factory.rocksdb_provider();
716 factory.set_storage_settings_cache(StorageSettings::v2());
717
718 let provider_rw = factory.database_provider_rw().unwrap();
720 insert_genesis_history(&provider_rw, factory.chain_spec().genesis.alloc.iter())?;
721 provider_rw.commit()?;
722
723 let provider = factory.database_provider_ro().unwrap();
724
725 let result = rocksdb.heal_storages_history(&provider).unwrap();
727 assert_eq!(result, None, "Should skip healing when only genesis entries present");
728
729 assert!(
731 rocksdb.iter::<tables::StoragesHistory>().unwrap().count() > 0,
732 "Genesis entries should be preserved"
733 );
734
735 Ok(())
736 }
737
738 #[test]
739 fn test_check_consistency_mdbx_behind_checkpoint_needs_unwind() {
740 let temp_dir = TempDir::new().unwrap();
741 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
742
743 let factory = create_test_provider_factory();
744 factory.set_storage_settings_cache(StorageSettings::v2());
745
746 let mut rng = generators::rng();
748 let blocks = generators::random_block_range(
749 &mut rng,
750 0..=2,
751 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
752 );
753
754 {
755 let provider = factory.database_provider_rw().unwrap();
756 let mut tx_count = 0u64;
757 for block in &blocks {
758 provider
759 .insert_block(&block.clone().try_recover().expect("recover block"))
760 .unwrap();
761 for tx in &block.body().transactions {
762 let hash = tx.trie_hash();
763 rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
764 tx_count += 1;
765 }
766 }
767 provider.commit().unwrap();
768 }
769
770 {
774 let provider = factory.database_provider_rw().unwrap();
775 provider
776 .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(10))
777 .unwrap();
778 provider
780 .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(0))
781 .unwrap();
782 provider
783 .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(0))
784 .unwrap();
785 provider.commit().unwrap();
786 }
787
788 let provider = factory.database_provider_ro().unwrap();
789
790 let result = rocksdb.check_consistency(&provider).unwrap();
792 assert_eq!(result, Some(2), "sf_tip < checkpoint requires unwind to sf_tip");
793 }
794
795 #[test]
796 fn test_check_consistency_rocksdb_ahead_of_checkpoint_prunes_excess() {
797 let temp_dir = TempDir::new().unwrap();
798 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
799
800 let factory = create_test_provider_factory();
802 factory.set_storage_settings_cache(StorageSettings::v2());
803
804 let mut rng = generators::rng();
807 let blocks = generators::random_block_range(
808 &mut rng,
809 0..=5,
810 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
811 );
812
813 let mut tx_hashes = Vec::new();
815 let mut tx_count = 0u64;
816 {
817 let provider = factory.database_provider_rw().unwrap();
818 for block in &blocks {
820 provider
821 .insert_block(&block.clone().try_recover().expect("recover block"))
822 .unwrap();
823 for tx in &block.body().transactions {
824 let hash = tx.trie_hash();
825 tx_hashes.push(hash);
826 rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
827 tx_count += 1;
828 }
829 }
830 provider.commit().unwrap();
831 }
832
833 {
837 let provider = factory.database_provider_rw().unwrap();
838 let mut cursor = provider.tx_ref().cursor_write::<tables::TransactionBlocks>().unwrap();
842 let mut to_delete = Vec::new();
844 let mut walker = cursor.walk(Some(0)).unwrap();
845 while let Some((tx_num, block_num)) = walker.next().transpose().unwrap() {
846 if block_num > 2 {
847 to_delete.push(tx_num);
848 }
849 }
850 drop(walker);
851 for tx_num in to_delete {
852 cursor.seek_exact(tx_num).unwrap();
853 cursor.delete_current().unwrap();
854 }
855
856 provider
858 .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(2))
859 .unwrap();
860 provider
862 .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(0))
863 .unwrap();
864 provider
865 .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(0))
866 .unwrap();
867 provider.commit().unwrap();
868 }
869
870 let provider = factory.database_provider_ro().unwrap();
871
872 let result = rocksdb.check_consistency(&provider).unwrap();
877 assert_eq!(result, None, "Should heal by pruning, no unwind needed");
878
879 for (i, hash) in tx_hashes.iter().take(6).enumerate() {
882 assert!(
883 rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap().is_some(),
884 "tx {} should remain",
885 i
886 );
887 }
888 for (i, hash) in tx_hashes.iter().skip(6).enumerate() {
890 assert!(
891 rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap().is_none(),
892 "tx {} should be pruned",
893 i + 6
894 );
895 }
896 }
897
898 #[test]
899 fn test_check_consistency_storages_history_sentinel_only_with_checkpoint_is_first_run() {
900 let temp_dir = TempDir::new().unwrap();
901 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
902
903 let key_sentinel_1 = StorageShardedKey::new(Address::ZERO, B256::ZERO, u64::MAX);
906 let key_sentinel_2 = StorageShardedKey::new(Address::random(), B256::random(), u64::MAX);
907 let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
908 rocksdb.put::<tables::StoragesHistory>(key_sentinel_1, &block_list).unwrap();
909 rocksdb.put::<tables::StoragesHistory>(key_sentinel_2, &block_list).unwrap();
910
911 assert!(rocksdb.first::<tables::StoragesHistory>().unwrap().is_some());
913
914 let factory = create_test_provider_factory();
916 factory.set_storage_settings_cache(StorageSettings::v2());
917
918 {
920 let provider = factory.database_provider_rw().unwrap();
921 provider
922 .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
923 .unwrap();
924 provider.commit().unwrap();
925 }
926
927 let provider = factory.database_provider_ro().unwrap();
928
929 let result = rocksdb.check_consistency(&provider).unwrap();
933 assert_eq!(result, Some(0), "sf_tip=0 < checkpoint=100 returns unwind target");
934 }
935
936 #[test]
937 fn test_check_consistency_accounts_history_sentinel_only_with_checkpoint_is_first_run() {
938 use reth_db_api::models::ShardedKey;
939
940 let temp_dir = TempDir::new().unwrap();
941 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
942
943 let key_sentinel_1 = ShardedKey::new(Address::ZERO, u64::MAX);
945 let key_sentinel_2 = ShardedKey::new(Address::random(), u64::MAX);
946 let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
947 rocksdb.put::<tables::AccountsHistory>(key_sentinel_1, &block_list).unwrap();
948 rocksdb.put::<tables::AccountsHistory>(key_sentinel_2, &block_list).unwrap();
949
950 assert!(rocksdb.first::<tables::AccountsHistory>().unwrap().is_some());
952
953 let factory = create_test_provider_factory();
955 factory.set_storage_settings_cache(StorageSettings::v2());
956
957 {
959 let provider = factory.database_provider_rw().unwrap();
960 provider
961 .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
962 .unwrap();
963 provider.commit().unwrap();
964 }
965
966 let provider = factory.database_provider_ro().unwrap();
967
968 let result = rocksdb.check_consistency(&provider).unwrap();
972 assert_eq!(result, Some(0), "sf_tip=0 < checkpoint=100 returns unwind target");
973 }
974
975 #[test]
979 fn test_prune_transaction_hash_numbers_by_range() {
980 let temp_dir = TempDir::new().unwrap();
981 let rocksdb = RocksDBBuilder::new(temp_dir.path())
982 .with_table::<tables::TransactionHashNumbers>()
983 .build()
984 .unwrap();
985
986 let factory = create_test_provider_factory();
988 factory.set_storage_settings_cache(StorageSettings::v2());
989
990 let mut rng = generators::rng();
994 let blocks = generators::random_block_range(
995 &mut rng,
996 0..=5,
997 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
998 );
999
1000 let mut tx_count = 0u64;
1002 let mut tx_hashes = Vec::new();
1003 {
1004 let provider = factory.database_provider_rw().unwrap();
1005
1006 for block in &blocks {
1007 provider
1008 .insert_block(&block.clone().try_recover().expect("recover block"))
1009 .unwrap();
1010
1011 for tx in &block.body().transactions {
1013 let hash = tx.trie_hash();
1014 tx_hashes.push(hash);
1015 rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
1016 tx_count += 1;
1017 }
1018 }
1019
1020 provider
1023 .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(2))
1024 .unwrap();
1025 provider.commit().unwrap();
1026 }
1027
1028 let rocksdb_count_before: usize =
1035 rocksdb.iter::<tables::TransactionHashNumbers>().unwrap().count();
1036 assert_eq!(
1037 rocksdb_count_before, tx_count as usize,
1038 "RocksDB should have all {} transaction hashes before pruning",
1039 tx_count
1040 );
1041
1042 let provider = factory.database_provider_ro().unwrap();
1043
1044 let all_txs = provider.transactions_by_tx_range(0..tx_count).unwrap();
1046 assert_eq!(all_txs.len(), tx_count as usize, "Should be able to fetch all transactions");
1047
1048 for (i, tx) in all_txs.iter().enumerate() {
1050 let computed_hash = tx.trie_hash();
1051 assert_eq!(
1052 computed_hash, tx_hashes[i],
1053 "Hash mismatch for tx {}: stored {:?} vs computed {:?}",
1054 i, tx_hashes[i], computed_hash
1055 );
1056 }
1057
1058 let max_tx_to_keep = 5u64;
1061 let tx_to_prune_start = max_tx_to_keep + 1;
1062
1063 rocksdb
1065 .prune_transaction_hash_numbers_in_range(&provider, tx_to_prune_start..=(tx_count - 1))
1066 .expect("prune should succeed");
1067
1068 let mut remaining_count = 0;
1070 for result in rocksdb.iter::<tables::TransactionHashNumbers>().unwrap() {
1071 let (_hash, tx_num) = result.unwrap();
1072 assert!(
1073 tx_num <= max_tx_to_keep,
1074 "Transaction {} should have been pruned (> {})",
1075 tx_num,
1076 max_tx_to_keep
1077 );
1078 remaining_count += 1;
1079 }
1080 assert_eq!(
1081 remaining_count,
1082 (max_tx_to_keep + 1) as usize,
1083 "Should have {} transactions (0-{})",
1084 max_tx_to_keep + 1,
1085 max_tx_to_keep
1086 );
1087 }
1088
1089 #[test]
1090 fn test_check_consistency_accounts_history_empty_with_checkpoint_is_first_run() {
1091 let temp_dir = TempDir::new().unwrap();
1092 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1093
1094 let factory = create_test_provider_factory();
1096 factory.set_storage_settings_cache(StorageSettings::v2());
1097
1098 {
1100 let provider = factory.database_provider_rw().unwrap();
1101 provider
1102 .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
1103 .unwrap();
1104 provider.commit().unwrap();
1105 }
1106
1107 let provider = factory.database_provider_ro().unwrap();
1108
1109 let result = rocksdb.check_consistency(&provider).unwrap();
1113 assert_eq!(result, Some(0), "sf_tip=0 < checkpoint=100 returns unwind target");
1114 }
1115
1116 #[test]
1117 fn test_check_consistency_accounts_history_preserves_genesis_entries_at_checkpoint_zero(
1118 ) -> eyre::Result<()> {
1119 let factory = create_test_provider_factory();
1121 factory.set_storage_settings_cache(StorageSettings::v2());
1122 let rocksdb = factory.rocksdb_provider();
1123
1124 let provider_rw = factory.database_provider_rw().unwrap();
1126 insert_genesis_history(&provider_rw, factory.chain_spec().genesis.alloc.iter())?;
1127 provider_rw.commit()?;
1128
1129 let provider = factory.database_provider_ro().unwrap();
1130
1131 let result = rocksdb.check_consistency(&provider).unwrap();
1133 assert_eq!(result, None, "Should heal by pruning, no unwind needed");
1134
1135 assert!(
1137 rocksdb.iter::<tables::AccountsHistory>().unwrap().count() > 0,
1138 "Genesis entries should be preserved"
1139 );
1140
1141 Ok(())
1142 }
1143
1144 #[test]
1145 fn test_check_consistency_accounts_history_sf_tip_equals_checkpoint_no_action() {
1146 use reth_db::models::AccountBeforeTx;
1147 use reth_db_api::models::ShardedKey;
1148 use reth_static_file_types::StaticFileSegment;
1149
1150 let temp_dir = TempDir::new().unwrap();
1151 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1152
1153 let key1 = ShardedKey::new(Address::ZERO, 50);
1155 let key2 = ShardedKey::new(Address::random(), 75);
1156 let key3 = ShardedKey::new(Address::random(), u64::MAX); let block_list1 = BlockNumberList::new_pre_sorted([10, 20, 30, 50]);
1158 let block_list2 = BlockNumberList::new_pre_sorted([40, 60, 75]);
1159 let block_list3 = BlockNumberList::new_pre_sorted([80, 90, 100]);
1160 rocksdb.put::<tables::AccountsHistory>(key1, &block_list1).unwrap();
1161 rocksdb.put::<tables::AccountsHistory>(key2, &block_list2).unwrap();
1162 rocksdb.put::<tables::AccountsHistory>(key3, &block_list3).unwrap();
1163
1164 let entries_before: Vec<_> =
1166 rocksdb.iter::<tables::AccountsHistory>().unwrap().map(|r| r.unwrap()).collect();
1167 assert_eq!(entries_before.len(), 3, "Should have 3 entries before check");
1168
1169 let factory = create_test_provider_factory();
1171 factory.set_storage_settings_cache(StorageSettings::v2());
1172
1173 {
1175 let sf_provider = factory.static_file_provider();
1176 let mut writer =
1177 sf_provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
1178
1179 for block_num in 0..=100 {
1180 let changeset = vec![AccountBeforeTx { address: Address::random(), info: None }];
1181 writer.append_account_changeset(changeset, block_num).unwrap();
1182 }
1183
1184 writer.commit().unwrap();
1185 }
1186
1187 {
1189 let provider = factory.database_provider_rw().unwrap();
1190 provider
1191 .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
1192 .unwrap();
1193 provider.commit().unwrap();
1194 }
1195
1196 let provider = factory.database_provider_ro().unwrap();
1197
1198 let sf_tip = provider
1200 .static_file_provider()
1201 .get_highest_static_file_block(StaticFileSegment::AccountChangeSets)
1202 .unwrap();
1203 assert_eq!(sf_tip, 100, "Static file tip should be 100");
1204
1205 let result = rocksdb.check_consistency(&provider).unwrap();
1207 assert_eq!(result, None, "sf_tip == checkpoint should not require unwind");
1208
1209 let entries_after: Vec<_> =
1211 rocksdb.iter::<tables::AccountsHistory>().unwrap().map(|r| r.unwrap()).collect();
1212
1213 assert_eq!(
1214 entries_after.len(),
1215 entries_before.len(),
1216 "RocksDB entry count should be unchanged when sf_tip == checkpoint"
1217 );
1218
1219 for (before, after) in entries_before.iter().zip(entries_after.iter()) {
1221 assert_eq!(before.0.key, after.0.key, "Entry key should be unchanged");
1222 assert_eq!(
1223 before.0.highest_block_number, after.0.highest_block_number,
1224 "Entry highest_block_number should be unchanged"
1225 );
1226 assert_eq!(before.1, after.1, "Entry block list should be unchanged");
1227 }
1228 }
1229
1230 #[test]
1242 fn test_check_consistency_storages_history_heals_via_changesets_large_range() {
1243 use alloy_primitives::U256;
1244 use reth_db_api::models::StorageBeforeTx;
1245
1246 const TOTAL_BLOCKS: u64 = 15_000;
1247 const CHECKPOINT_BLOCK: u64 = 5_000;
1248
1249 let temp_dir = TempDir::new().unwrap();
1250 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1251
1252 let factory = create_test_provider_factory();
1253 factory.set_storage_settings_cache(StorageSettings::v2());
1254
1255 #[inline]
1257 fn make_address(block_num: u64) -> Address {
1258 let mut addr_bytes = [0u8; 20];
1259 addr_bytes[0..8].copy_from_slice(&block_num.to_le_bytes());
1260 Address::from(addr_bytes)
1261 }
1262
1263 #[inline]
1265 fn make_slot(block_num: u64) -> B256 {
1266 let mut slot_bytes = [0u8; 32];
1267 slot_bytes[0..8].copy_from_slice(&block_num.to_le_bytes());
1268 B256::from(slot_bytes)
1269 }
1270
1271 {
1274 let sf_provider = factory.static_file_provider();
1275 let mut writer =
1276 sf_provider.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
1277
1278 let mut changeset = Vec::with_capacity(1);
1280
1281 for block_num in 0..TOTAL_BLOCKS {
1282 changeset.clear();
1283 changeset.push(StorageBeforeTx {
1284 address: make_address(block_num),
1285 key: make_slot(block_num),
1286 value: U256::from(block_num),
1287 });
1288
1289 writer.append_storage_changeset(changeset.clone(), block_num).unwrap();
1290 }
1291
1292 writer.commit().unwrap();
1293 }
1294
1295 {
1297 let sf_provider = factory.static_file_provider();
1298 let highest = sf_provider
1299 .get_highest_static_file_block(StaticFileSegment::StorageChangeSets)
1300 .unwrap();
1301 assert_eq!(highest, TOTAL_BLOCKS - 1, "Static files should have blocks 0..14999");
1302 }
1303
1304 {
1306 let provider = factory.database_provider_rw().unwrap();
1307 provider
1308 .save_stage_checkpoint(
1309 StageId::IndexStorageHistory,
1310 StageCheckpoint::new(CHECKPOINT_BLOCK),
1311 )
1312 .unwrap();
1313 provider.commit().unwrap();
1314 }
1315
1316 for block_num in (CHECKPOINT_BLOCK + 1)..TOTAL_BLOCKS {
1319 let key =
1320 StorageShardedKey::new(make_address(block_num), make_slot(block_num), block_num);
1321 let block_list = BlockNumberList::new_pre_sorted([block_num]);
1322 rocksdb.put::<tables::StoragesHistory>(key, &block_list).unwrap();
1323 }
1324
1325 let count_before: usize = rocksdb.iter::<tables::StoragesHistory>().unwrap().count();
1327 assert_eq!(
1328 count_before,
1329 (TOTAL_BLOCKS - CHECKPOINT_BLOCK - 1) as usize,
1330 "Should have {} stale entries before healing",
1331 TOTAL_BLOCKS - CHECKPOINT_BLOCK - 1
1332 );
1333
1334 let provider = factory.database_provider_ro().unwrap();
1336 let result = rocksdb.check_consistency(&provider).unwrap();
1337 assert_eq!(result, None, "Should heal via changesets, no unwind needed");
1338
1339 let mut remaining_stale = 0;
1342 for result in rocksdb.iter::<tables::StoragesHistory>().unwrap() {
1343 let (key, _) = result.unwrap();
1344 if key.sharded_key.highest_block_number > CHECKPOINT_BLOCK {
1345 remaining_stale += 1;
1346 }
1347 }
1348 assert_eq!(
1349 remaining_stale, 0,
1350 "All stale entries (block > {}) should be pruned",
1351 CHECKPOINT_BLOCK
1352 );
1353 }
1354
1355 #[test]
1359 fn test_check_consistency_storages_history_preserves_checkpoint_block() {
1360 use alloy_primitives::U256;
1361 use reth_db_api::models::StorageBeforeTx;
1362
1363 const CHECKPOINT_BLOCK: u64 = 100;
1364 const SF_TIP: u64 = 200;
1365
1366 let temp_dir = TempDir::new().unwrap();
1367 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1368
1369 let factory = create_test_provider_factory();
1370 factory.set_storage_settings_cache(StorageSettings::v2());
1371
1372 let checkpoint_addr = Address::repeat_byte(0xAA);
1373 let checkpoint_slot = B256::repeat_byte(0xBB);
1374 let stale_addr = Address::repeat_byte(0xCC);
1375 let stale_slot = B256::repeat_byte(0xDD);
1376
1377 {
1379 let sf_provider = factory.static_file_provider();
1380 let mut writer =
1381 sf_provider.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
1382
1383 for block_num in 0..=SF_TIP {
1384 let changeset = if block_num == CHECKPOINT_BLOCK {
1385 vec![StorageBeforeTx {
1386 address: checkpoint_addr,
1387 key: checkpoint_slot,
1388 value: U256::from(block_num),
1389 }]
1390 } else if block_num > CHECKPOINT_BLOCK {
1391 vec![StorageBeforeTx {
1392 address: stale_addr,
1393 key: stale_slot,
1394 value: U256::from(block_num),
1395 }]
1396 } else {
1397 vec![StorageBeforeTx {
1398 address: Address::ZERO,
1399 key: B256::ZERO,
1400 value: U256::ZERO,
1401 }]
1402 };
1403 writer.append_storage_changeset(changeset, block_num).unwrap();
1404 }
1405 writer.commit().unwrap();
1406 }
1407
1408 {
1410 let provider = factory.database_provider_rw().unwrap();
1411 provider
1412 .save_stage_checkpoint(
1413 StageId::IndexStorageHistory,
1414 StageCheckpoint::new(CHECKPOINT_BLOCK),
1415 )
1416 .unwrap();
1417 provider.commit().unwrap();
1418 }
1419
1420 let checkpoint_key =
1422 StorageShardedKey::new(checkpoint_addr, checkpoint_slot, CHECKPOINT_BLOCK);
1423 let checkpoint_list = BlockNumberList::new_pre_sorted([CHECKPOINT_BLOCK]);
1424 rocksdb.put::<tables::StoragesHistory>(checkpoint_key.clone(), &checkpoint_list).unwrap();
1425
1426 let stale_key = StorageShardedKey::new(stale_addr, stale_slot, SF_TIP);
1428 let stale_list = BlockNumberList::new_pre_sorted([CHECKPOINT_BLOCK + 1, SF_TIP]);
1429 rocksdb.put::<tables::StoragesHistory>(stale_key.clone(), &stale_list).unwrap();
1430
1431 let provider = factory.database_provider_ro().unwrap();
1433 let result = rocksdb.check_consistency(&provider).unwrap();
1434 assert_eq!(result, None, "Should heal without unwind");
1435
1436 let preserved = rocksdb.get::<tables::StoragesHistory>(checkpoint_key).unwrap();
1438 assert!(preserved.is_some(), "Entry at checkpoint block should be preserved, not deleted");
1439
1440 let stale = rocksdb.get::<tables::StoragesHistory>(stale_key).unwrap();
1442 assert!(stale.is_none(), "Stale entry after checkpoint should be removed");
1443 }
1444
1445 #[test]
1459 fn test_check_consistency_accounts_history_heals_via_changesets_large_range() {
1460 use reth_db::models::AccountBeforeTx;
1461 use reth_db_api::models::ShardedKey;
1462 use reth_static_file_types::StaticFileSegment;
1463
1464 let temp_dir = TempDir::new().unwrap();
1465 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1466
1467 let factory = create_test_provider_factory();
1469 factory.set_storage_settings_cache(StorageSettings::v2());
1470
1471 const TOTAL_BLOCKS: u64 = 15_000;
1472 const CHECKPOINT_BLOCK: u64 = 5_000;
1473
1474 #[inline]
1476 fn make_address(block_num: u64) -> Address {
1477 let mut addr = Address::ZERO;
1478 addr.0[0..8].copy_from_slice(&block_num.to_le_bytes());
1479 addr
1480 }
1481
1482 {
1484 let sf_provider = factory.static_file_provider();
1485 let mut writer =
1486 sf_provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
1487
1488 let mut changeset = Vec::with_capacity(1);
1490
1491 for block_num in 0..TOTAL_BLOCKS {
1492 changeset.clear();
1493 changeset.push(AccountBeforeTx { address: make_address(block_num), info: None });
1494 writer.append_account_changeset(changeset.clone(), block_num).unwrap();
1495 }
1496
1497 writer.commit().unwrap();
1498 }
1499
1500 for block_num in (CHECKPOINT_BLOCK + 1)..TOTAL_BLOCKS {
1504 let key = ShardedKey::new(make_address(block_num), block_num);
1505 let block_list = BlockNumberList::new_pre_sorted([block_num]);
1506 rocksdb.put::<tables::AccountsHistory>(key, &block_list).unwrap();
1507 }
1508
1509 for block_num in [100u64, 500, 1000, 2500, 5000] {
1511 let key = ShardedKey::new(make_address(block_num), block_num);
1512 let block_list = BlockNumberList::new_pre_sorted([block_num]);
1513 rocksdb.put::<tables::AccountsHistory>(key, &block_list).unwrap();
1514 }
1515
1516 let entries_before: usize = rocksdb.iter::<tables::AccountsHistory>().unwrap().count();
1518 let stale_count = (TOTAL_BLOCKS - CHECKPOINT_BLOCK - 1) as usize;
1519 let valid_count = 5usize;
1520 assert_eq!(
1521 entries_before,
1522 stale_count + valid_count,
1523 "Should have {} stale + {} valid entries before healing",
1524 stale_count,
1525 valid_count
1526 );
1527
1528 {
1530 let provider = factory.database_provider_rw().unwrap();
1531 provider
1532 .save_stage_checkpoint(
1533 StageId::IndexAccountHistory,
1534 StageCheckpoint::new(CHECKPOINT_BLOCK),
1535 )
1536 .unwrap();
1537 provider.commit().unwrap();
1538 }
1539
1540 let provider = factory.database_provider_ro().unwrap();
1541
1542 let sf_tip = provider
1544 .static_file_provider()
1545 .get_highest_static_file_block(StaticFileSegment::AccountChangeSets)
1546 .unwrap();
1547 assert_eq!(sf_tip, TOTAL_BLOCKS - 1, "Static file tip should be 14999");
1548 assert!(sf_tip > CHECKPOINT_BLOCK, "sf_tip should be > checkpoint to trigger healing");
1549
1550 let result = rocksdb.check_consistency(&provider).unwrap();
1552 assert_eq!(result, None, "Healing should succeed without requiring unwind");
1553
1554 let mut remaining_stale = 0;
1557 for result in rocksdb.iter::<tables::AccountsHistory>().unwrap() {
1558 let (key, _) = result.unwrap();
1559 if key.highest_block_number > CHECKPOINT_BLOCK && key.highest_block_number != u64::MAX {
1560 remaining_stale += 1;
1561 }
1562 }
1563 assert_eq!(
1564 remaining_stale, 0,
1565 "All stale entries (block > {}) should be pruned",
1566 CHECKPOINT_BLOCK
1567 );
1568 }
1569
1570 #[test]
1572 fn test_check_consistency_accounts_history_preserves_checkpoint_block() {
1573 use reth_db::models::AccountBeforeTx;
1574 use reth_db_api::models::ShardedKey;
1575
1576 const CHECKPOINT_BLOCK: u64 = 100;
1577 const SF_TIP: u64 = 200;
1578
1579 let temp_dir = TempDir::new().unwrap();
1580 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1581
1582 let factory = create_test_provider_factory();
1583 factory.set_storage_settings_cache(StorageSettings::v2());
1584
1585 let checkpoint_addr = Address::repeat_byte(0xAA);
1586 let stale_addr = Address::repeat_byte(0xCC);
1587
1588 {
1590 let sf_provider = factory.static_file_provider();
1591 let mut writer =
1592 sf_provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
1593
1594 for block_num in 0..=SF_TIP {
1595 let changeset = if block_num == CHECKPOINT_BLOCK {
1596 vec![AccountBeforeTx { address: checkpoint_addr, info: None }]
1597 } else if block_num > CHECKPOINT_BLOCK {
1598 vec![AccountBeforeTx { address: stale_addr, info: None }]
1599 } else {
1600 vec![AccountBeforeTx { address: Address::ZERO, info: None }]
1601 };
1602 writer.append_account_changeset(changeset, block_num).unwrap();
1603 }
1604 writer.commit().unwrap();
1605 }
1606
1607 {
1609 let provider = factory.database_provider_rw().unwrap();
1610 provider
1611 .save_stage_checkpoint(
1612 StageId::IndexAccountHistory,
1613 StageCheckpoint::new(CHECKPOINT_BLOCK),
1614 )
1615 .unwrap();
1616 provider.commit().unwrap();
1617 }
1618
1619 let checkpoint_key = ShardedKey::new(checkpoint_addr, CHECKPOINT_BLOCK);
1621 let checkpoint_list = BlockNumberList::new_pre_sorted([CHECKPOINT_BLOCK]);
1622 rocksdb.put::<tables::AccountsHistory>(checkpoint_key.clone(), &checkpoint_list).unwrap();
1623
1624 let stale_key = ShardedKey::new(stale_addr, SF_TIP);
1626 let stale_list = BlockNumberList::new_pre_sorted([CHECKPOINT_BLOCK + 1, SF_TIP]);
1627 rocksdb.put::<tables::AccountsHistory>(stale_key.clone(), &stale_list).unwrap();
1628
1629 let provider = factory.database_provider_ro().unwrap();
1631 let result = rocksdb.check_consistency(&provider).unwrap();
1632 assert_eq!(result, None, "Should heal without unwind");
1633
1634 let preserved = rocksdb.get::<tables::AccountsHistory>(checkpoint_key).unwrap();
1636 assert!(preserved.is_some(), "Entry at checkpoint block should be preserved, not deleted");
1637
1638 let stale = rocksdb.get::<tables::AccountsHistory>(stale_key).unwrap();
1640 assert!(stale.is_none(), "Stale entry after checkpoint should be removed");
1641 }
1642
1643 #[test]
1644 fn test_check_consistency_storages_history_sf_tip_equals_checkpoint_no_action() {
1645 use alloy_primitives::U256;
1646 use reth_db::models::StorageBeforeTx;
1647 use reth_static_file_types::StaticFileSegment;
1648
1649 let temp_dir = TempDir::new().unwrap();
1650 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1651
1652 let key1 = StorageShardedKey::new(Address::ZERO, B256::ZERO, 50);
1654 let key2 = StorageShardedKey::new(Address::random(), B256::random(), 80);
1655 let block_list1 = BlockNumberList::new_pre_sorted([10, 20, 30, 50]);
1656 let block_list2 = BlockNumberList::new_pre_sorted([40, 60, 80]);
1657 rocksdb.put::<tables::StoragesHistory>(key1, &block_list1).unwrap();
1658 rocksdb.put::<tables::StoragesHistory>(key2, &block_list2).unwrap();
1659
1660 let entries_before: Vec<_> =
1662 rocksdb.iter::<tables::StoragesHistory>().unwrap().map(|r| r.unwrap()).collect();
1663
1664 let factory = create_test_provider_factory();
1666 factory.set_storage_settings_cache(StorageSettings::v2());
1667
1668 {
1670 let sf_provider = factory.static_file_provider();
1671 let mut writer =
1672 sf_provider.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
1673
1674 for block_num in 0..=100u64 {
1675 let changeset = vec![StorageBeforeTx {
1676 address: Address::ZERO,
1677 key: B256::with_last_byte(block_num as u8),
1678 value: U256::from(block_num),
1679 }];
1680 writer.append_storage_changeset(changeset, block_num).unwrap();
1681 }
1682 writer.commit().unwrap();
1683 }
1684
1685 {
1687 let provider = factory.database_provider_rw().unwrap();
1688 provider
1689 .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
1690 .unwrap();
1691 provider.commit().unwrap();
1692 }
1693
1694 let provider = factory.database_provider_ro().unwrap();
1695
1696 let sf_tip = provider
1698 .static_file_provider()
1699 .get_highest_static_file_block(StaticFileSegment::StorageChangeSets)
1700 .unwrap();
1701 assert_eq!(sf_tip, 100, "Static file tip should be 100");
1702
1703 let result = rocksdb.check_consistency(&provider).unwrap();
1705 assert_eq!(result, None, "sf_tip == checkpoint should not require unwind");
1706
1707 let entries_after: Vec<_> =
1709 rocksdb.iter::<tables::StoragesHistory>().unwrap().map(|r| r.unwrap()).collect();
1710
1711 assert_eq!(
1712 entries_after.len(),
1713 entries_before.len(),
1714 "RocksDB entry count should be unchanged when sf_tip == checkpoint"
1715 );
1716
1717 for (before, after) in entries_before.iter().zip(entries_after.iter()) {
1719 assert_eq!(before.0, after.0, "Entry key should be unchanged");
1720 assert_eq!(before.1, after.1, "Entry block list should be unchanged");
1721 }
1722 }
1723}