1use super::RocksDBProvider;
8use crate::StaticFileProviderFactory;
9use alloy_consensus::transaction::TxHashRef;
10use alloy_primitives::BlockNumber;
11use rayon::prelude::*;
12use reth_chainspec::{ChainSpecProvider, EthChainSpec};
13use reth_db::models::{storage_sharded_key::StorageShardedKey, ShardedKey};
14use reth_db_api::tables;
15use reth_stages_types::StageId;
16use reth_static_file_types::StaticFileSegment;
17use reth_storage_api::{
18 BlockBodyIndicesProvider, ChangeSetReader, DBProvider, StageCheckpointReader,
19 StorageChangeSetReader, StorageSettingsCache, TransactionsProvider,
20};
21use reth_storage_errors::provider::ProviderResult;
22use std::collections::HashSet;
23
24const HEAL_HISTORY_BATCH_SIZE: u64 = 10_000;
27
28impl RocksDBProvider {
29 pub fn check_consistency<Provider>(
51 &self,
52 provider: &Provider,
53 ) -> ProviderResult<Option<BlockNumber>>
54 where
55 Provider: DBProvider
56 + StageCheckpointReader
57 + StorageSettingsCache
58 + StaticFileProviderFactory
59 + BlockBodyIndicesProvider
60 + StorageChangeSetReader
61 + ChangeSetReader
62 + TransactionsProvider
63 + ChainSpecProvider,
64 {
65 let mut unwind_target: Option<BlockNumber> = None;
66
67 if provider.cached_storage_settings().storage_v2 &&
69 let Some(target) = self.heal_transaction_hash_numbers(provider)?
70 {
71 unwind_target = Some(unwind_target.map_or(target, |t| t.min(target)));
72 }
73
74 if provider.cached_storage_settings().storage_v2 &&
76 let Some(target) = self.heal_storages_history(provider)?
77 {
78 unwind_target = Some(unwind_target.map_or(target, |t| t.min(target)));
79 }
80
81 if provider.cached_storage_settings().storage_v2 &&
83 let Some(target) = self.heal_accounts_history(provider)?
84 {
85 unwind_target = Some(unwind_target.map_or(target, |t| t.min(target)));
86 }
87
88 Ok(unwind_target)
89 }
90
91 fn heal_transaction_hash_numbers<Provider>(
98 &self,
99 provider: &Provider,
100 ) -> ProviderResult<Option<BlockNumber>>
101 where
102 Provider: DBProvider
103 + StageCheckpointReader
104 + StaticFileProviderFactory
105 + BlockBodyIndicesProvider
106 + TransactionsProvider,
107 {
108 let checkpoint = provider
109 .get_stage_checkpoint(StageId::TransactionLookup)?
110 .map(|cp| cp.block_number)
111 .unwrap_or(0);
112
113 let sf_tip = provider
114 .static_file_provider()
115 .get_highest_static_file_block(StaticFileSegment::Transactions)
116 .unwrap_or(0);
117
118 if checkpoint == 0 {
120 if self.first::<tables::TransactionHashNumbers>()?.is_some() {
121 tracing::info!(
122 target: "reth::providers::rocksdb",
123 "TransactionHashNumbers: checkpoint is 0, clearing stale data"
124 );
125 self.clear::<tables::TransactionHashNumbers>()?;
126 }
127
128 return Ok(None);
129 }
130
131 if sf_tip < checkpoint {
132 tracing::warn!(
136 target: "reth::providers::rocksdb",
137 sf_tip,
138 checkpoint,
139 "TransactionHashNumbers: static file tip behind checkpoint, unwind needed"
140 );
141 return Ok(Some(sf_tip));
142 }
143
144 if sf_tip == checkpoint {
146 return Ok(None);
147 }
148
149 let sf_tip_end_tx = provider
151 .static_file_provider()
152 .get_highest_static_file_tx(StaticFileSegment::Transactions)
153 .unwrap_or(0);
154
155 let checkpoint_next_tx = provider
157 .block_body_indices(checkpoint)?
158 .map(|indices| indices.next_tx_num())
159 .unwrap_or(0);
160
161 if sf_tip_end_tx < checkpoint_next_tx {
162 tracing::warn!(
166 target: "reth::providers::rocksdb",
167 sf_tip_end_tx,
168 checkpoint_next_tx,
169 checkpoint,
170 sf_tip,
171 "TransactionHashNumbers: static file tx tip behind checkpoint, unwind needed"
172 );
173 return Ok(Some(sf_tip));
174 }
175
176 tracing::info!(
177 target: "reth::providers::rocksdb",
178 checkpoint,
179 sf_tip,
180 checkpoint_next_tx,
181 sf_tip_end_tx,
182 "TransactionHashNumbers: healing via transaction ranges"
183 );
184
185 const BATCH_SIZE: u64 = 10_000;
186 let mut batch_start = checkpoint_next_tx;
187
188 while batch_start <= sf_tip_end_tx {
189 let batch_end = batch_start.saturating_add(BATCH_SIZE - 1).min(sf_tip_end_tx);
190
191 tracing::debug!(
192 target: "reth::providers::rocksdb",
193 batch_start,
194 batch_end,
195 "Pruning TransactionHashNumbers batch"
196 );
197
198 self.prune_transaction_hash_numbers_in_range(provider, batch_start..=batch_end)?;
199
200 batch_start = batch_end.saturating_add(1);
201 }
202
203 Ok(None)
204 }
205
206 fn prune_transaction_hash_numbers_in_range<Provider>(
219 &self,
220 provider: &Provider,
221 tx_range: std::ops::RangeInclusive<u64>,
222 ) -> ProviderResult<()>
223 where
224 Provider: TransactionsProvider,
225 {
226 if tx_range.is_empty() {
227 return Ok(());
228 }
229
230 let hashes: Vec<_> = provider
232 .transactions_by_tx_range(tx_range.clone())?
233 .into_par_iter()
234 .map(|tx| *tx.tx_hash())
235 .collect();
236
237 if !hashes.is_empty() {
238 tracing::info!(
239 target: "reth::providers::rocksdb",
240 deleted_count = hashes.len(),
241 tx_range_start = *tx_range.start(),
242 tx_range_end = *tx_range.end(),
243 "Pruning TransactionHashNumbers entries by tx range"
244 );
245
246 let mut batch = self.batch();
247 for hash in hashes {
248 batch.delete::<tables::TransactionHashNumbers>(hash)?;
249 }
250 batch.commit()?;
251 }
252
253 Ok(())
254 }
255
256 fn heal_storages_history<Provider>(
261 &self,
262 provider: &Provider,
263 ) -> ProviderResult<Option<BlockNumber>>
264 where
265 Provider: DBProvider
266 + StageCheckpointReader
267 + StaticFileProviderFactory
268 + StorageChangeSetReader
269 + ChainSpecProvider,
270 {
271 let checkpoint = provider
272 .get_stage_checkpoint(StageId::IndexStorageHistory)?
273 .map(|cp| cp.block_number)
274 .unwrap_or(0);
275
276 let sf_tip = provider
277 .static_file_provider()
278 .get_highest_static_file_block(StaticFileSegment::StorageChangeSets)
279 .unwrap_or(0);
280
281 if sf_tip < checkpoint {
282 tracing::warn!(
286 target: "reth::providers::rocksdb",
287 sf_tip,
288 checkpoint,
289 "StoragesHistory: static file tip behind checkpoint, unwind needed"
290 );
291 return Ok(Some(sf_tip));
292 }
293
294 if sf_tip == checkpoint {
295 return Ok(None);
296 }
297
298 if checkpoint == 0 {
300 tracing::info!(
301 target: "reth::providers::rocksdb",
302 "StoragesHistory: checkpoint is 0, clearing stale data"
303 );
304 self.clear::<tables::StoragesHistory>()?;
305
306 let chain_spec = provider.chain_spec();
307 let genesis = chain_spec.genesis();
308 let list = tables::BlockNumberList::new([0]).expect("single block always fits");
309 for (addr, account) in &genesis.alloc {
310 if let Some(storage) = &account.storage {
311 for key in storage.keys() {
312 self.put::<tables::StoragesHistory>(
313 StorageShardedKey::last(*addr, *key),
314 &list,
315 )?;
316 }
317 }
318 }
319
320 return Ok(None);
321 }
322
323 let total_blocks = sf_tip - checkpoint;
324 tracing::info!(
325 target: "reth::providers::rocksdb",
326 checkpoint,
327 sf_tip,
328 total_blocks,
329 "StoragesHistory: healing via changesets"
330 );
331
332 let mut batch_start = checkpoint + 1;
333 let mut batch_num = 0u64;
334 let total_batches = total_blocks.div_ceil(HEAL_HISTORY_BATCH_SIZE);
335
336 while batch_start <= sf_tip {
337 let batch_end = (batch_start + HEAL_HISTORY_BATCH_SIZE - 1).min(sf_tip);
338 batch_num += 1;
339
340 let changesets = provider.storage_changesets_range(batch_start..=batch_end)?;
341
342 let unique_keys: HashSet<_> = changesets
343 .into_iter()
344 .map(|(block_addr, entry)| (block_addr.address(), entry.key, checkpoint + 1))
345 .collect();
346 let indices: Vec<_> = unique_keys.into_iter().collect();
347
348 if !indices.is_empty() {
349 tracing::info!(
350 target: "reth::providers::rocksdb",
351 batch_num,
352 total_batches,
353 batch_start,
354 batch_end,
355 indices_count = indices.len(),
356 "StoragesHistory: unwinding batch"
357 );
358
359 let batch = self.unwind_storage_history_indices(&indices)?;
360 self.commit_batch(batch)?;
361 }
362
363 batch_start = batch_end + 1;
364 }
365
366 Ok(None)
367 }
368
369 fn heal_accounts_history<Provider>(
374 &self,
375 provider: &Provider,
376 ) -> ProviderResult<Option<BlockNumber>>
377 where
378 Provider: DBProvider
379 + StageCheckpointReader
380 + StaticFileProviderFactory
381 + ChangeSetReader
382 + ChainSpecProvider,
383 {
384 let checkpoint = provider
385 .get_stage_checkpoint(StageId::IndexAccountHistory)?
386 .map(|cp| cp.block_number)
387 .unwrap_or(0);
388
389 let sf_tip = provider
390 .static_file_provider()
391 .get_highest_static_file_block(StaticFileSegment::AccountChangeSets)
392 .unwrap_or(0);
393
394 if sf_tip < checkpoint {
395 tracing::warn!(
399 target: "reth::providers::rocksdb",
400 sf_tip,
401 checkpoint,
402 "AccountsHistory: static file tip behind checkpoint, unwind needed"
403 );
404 return Ok(Some(sf_tip));
405 }
406
407 if sf_tip == checkpoint {
408 return Ok(None);
409 }
410
411 if checkpoint == 0 {
413 tracing::info!(
414 target: "reth::providers::rocksdb",
415 "AccountsHistory: checkpoint is 0, clearing stale data"
416 );
417 self.clear::<tables::AccountsHistory>()?;
418
419 let chain_spec = provider.chain_spec();
420 let genesis = chain_spec.genesis();
421 let list = tables::BlockNumberList::new([0]).expect("single block always fits");
422 for addr in genesis.alloc.keys() {
423 self.put::<tables::AccountsHistory>(ShardedKey::last(*addr), &list)?;
424 }
425
426 return Ok(None);
427 }
428
429 let total_blocks = sf_tip - checkpoint;
430 tracing::info!(
431 target: "reth::providers::rocksdb",
432 checkpoint,
433 sf_tip,
434 total_blocks,
435 "AccountsHistory: healing via changesets"
436 );
437
438 let mut batch_start = checkpoint + 1;
439 let mut batch_num = 0u64;
440 let total_batches = total_blocks.div_ceil(HEAL_HISTORY_BATCH_SIZE);
441
442 while batch_start <= sf_tip {
443 let batch_end = (batch_start + HEAL_HISTORY_BATCH_SIZE - 1).min(sf_tip);
444 batch_num += 1;
445
446 let changesets = provider.account_changesets_range(batch_start..=batch_end)?;
447
448 let mut addresses = HashSet::with_capacity(changesets.len());
449 addresses.extend(changesets.iter().map(|(_, cs)| cs.address));
450 let unwind_from = checkpoint + 1;
451 let indices: Vec<_> = addresses.into_iter().map(|addr| (addr, unwind_from)).collect();
452
453 if !indices.is_empty() {
454 tracing::info!(
455 target: "reth::providers::rocksdb",
456 batch_num,
457 total_batches,
458 batch_start,
459 batch_end,
460 indices_count = indices.len(),
461 "AccountsHistory: unwinding batch"
462 );
463
464 let batch = self.unwind_account_history_indices(&indices)?;
465 self.commit_batch(batch)?;
466 }
467
468 batch_start = batch_end + 1;
469 }
470
471 Ok(None)
472 }
473}
474
475#[cfg(test)]
476mod tests {
477 use std::sync::Arc;
478
479 use super::*;
480 use crate::{
481 init::insert_genesis_history,
482 providers::{rocksdb::RocksDBBuilder, static_file::StaticFileWriter},
483 test_utils::{create_test_provider_factory, create_test_provider_factory_with_chain_spec},
484 BlockWriter, DatabaseProviderFactory, RocksDBProviderFactory, StageCheckpointWriter,
485 TransactionsProvider,
486 };
487 use alloy_primitives::{Address, B256};
488 use reth_chainspec::MAINNET;
489 use reth_db::cursor::{DbCursorRO, DbCursorRW};
490 use reth_db_api::{
491 models::{storage_sharded_key::StorageShardedKey, StorageSettings},
492 tables::{self, BlockNumberList},
493 transaction::DbTxMut,
494 };
495 use reth_stages_types::StageCheckpoint;
496 use reth_testing_utils::generators::{self, BlockRangeParams};
497 use tempfile::TempDir;
498
499 #[test]
500 fn test_first_last_empty_rocksdb() {
501 let temp_dir = TempDir::new().unwrap();
502 let provider = RocksDBBuilder::new(temp_dir.path())
503 .with_table::<tables::TransactionHashNumbers>()
504 .with_table::<tables::StoragesHistory>()
505 .build()
506 .unwrap();
507
508 let first = provider.first::<tables::TransactionHashNumbers>().unwrap();
510 let last = provider.last::<tables::TransactionHashNumbers>().unwrap();
511
512 assert!(first.is_none());
513 assert!(last.is_none());
514 }
515
516 #[test]
517 fn test_first_last_with_data() {
518 let temp_dir = TempDir::new().unwrap();
519 let provider = RocksDBBuilder::new(temp_dir.path())
520 .with_table::<tables::TransactionHashNumbers>()
521 .build()
522 .unwrap();
523
524 let tx_hash = B256::from([1u8; 32]);
526 provider.put::<tables::TransactionHashNumbers>(tx_hash, &100).unwrap();
527
528 let last = provider.last::<tables::TransactionHashNumbers>().unwrap();
530 assert!(last.is_some());
531 assert_eq!(last.unwrap().1, 100);
532 }
533
534 #[test]
535 fn test_check_consistency_empty_rocksdb_no_checkpoint_is_ok() {
536 let temp_dir = TempDir::new().unwrap();
537 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
538
539 let factory = create_test_provider_factory();
541 factory.set_storage_settings_cache(StorageSettings::v2());
542
543 let provider = factory.database_provider_ro().unwrap();
544
545 let result = rocksdb.check_consistency(&provider).unwrap();
547 assert_eq!(result, None);
548 }
549
550 #[test]
555 fn test_check_consistency_checkpoint_zero_empty_rocksdb_returns_early() {
556 let temp_dir = TempDir::new().unwrap();
557 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
558
559 let factory = create_test_provider_factory();
560 factory.set_storage_settings_cache(StorageSettings::v2());
561
562 let provider = factory.database_provider_ro().unwrap();
565
566 let result = rocksdb.heal_transaction_hash_numbers(&provider).unwrap();
567 assert_eq!(result, None, "TransactionHashNumbers should return early at checkpoint 0");
568 assert!(rocksdb.first::<tables::TransactionHashNumbers>().unwrap().is_none());
569
570 let result = rocksdb.heal_storages_history(&provider).unwrap();
571 assert_eq!(result, None, "StoragesHistory should return early at checkpoint 0");
572
573 let result = rocksdb.heal_accounts_history(&provider).unwrap();
574 assert_eq!(result, None, "AccountsHistory should return early at checkpoint 0");
575 assert_eq!(rocksdb.iter::<tables::AccountsHistory>().unwrap().count(), 0);
576 }
577
578 #[test]
579 fn test_check_consistency_empty_rocksdb_with_checkpoint_is_first_run() {
580 let temp_dir = TempDir::new().unwrap();
581 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
582
583 let factory = create_test_provider_factory();
585 factory.set_storage_settings_cache(StorageSettings::v2());
586
587 {
589 let provider = factory.database_provider_rw().unwrap();
590 provider
591 .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(100))
592 .unwrap();
593 provider.commit().unwrap();
594 }
595
596 let provider = factory.database_provider_ro().unwrap();
597
598 let result = rocksdb.check_consistency(&provider).unwrap();
601 assert_eq!(result, Some(0), "Static file tip (0) behind checkpoint (100) triggers unwind");
602 }
603
604 #[test]
607 fn test_check_consistency_checkpoint_zero_with_rocksdb_data_prunes_all() {
608 let temp_dir = TempDir::new().unwrap();
609 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
610
611 let factory = create_test_provider_factory();
612 factory.set_storage_settings_cache(StorageSettings::v2());
613
614 let mut rng = generators::rng();
616 let blocks = generators::random_block_range(
617 &mut rng,
618 0..=2,
619 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
620 );
621
622 let mut tx_hashes = Vec::new();
623 {
624 let provider = factory.database_provider_rw().unwrap();
625 let mut tx_count = 0u64;
626 for block in &blocks {
627 provider
628 .insert_block(&block.clone().try_recover().expect("recover block"))
629 .unwrap();
630 for tx in &block.body().transactions {
631 let hash = *tx.tx_hash();
632 tx_hashes.push(hash);
633 rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
634 tx_count += 1;
635 }
636 }
637 provider.commit().unwrap();
638 }
639
640 {
642 let provider = factory.database_provider_rw().unwrap();
643 provider
644 .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(0))
645 .unwrap();
646 provider
647 .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(0))
648 .unwrap();
649 provider
650 .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(0))
651 .unwrap();
652 provider.commit().unwrap();
653 }
654
655 assert!(rocksdb.last::<tables::TransactionHashNumbers>().unwrap().is_some());
657
658 let provider = factory.database_provider_ro().unwrap();
659
660 let result = rocksdb.check_consistency(&provider).unwrap();
663 assert_eq!(result, None, "Should heal by clearing, no unwind needed");
664
665 for hash in &tx_hashes {
667 assert!(
668 rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap().is_none(),
669 "RocksDB should be empty after pruning"
670 );
671 }
672 }
673
674 #[test]
675 fn test_check_consistency_storages_history_empty_with_checkpoint_is_first_run() {
676 let temp_dir = TempDir::new().unwrap();
677 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
678
679 let factory = create_test_provider_factory();
681 factory.set_storage_settings_cache(StorageSettings::v2());
682
683 {
685 let provider = factory.database_provider_rw().unwrap();
686 provider
687 .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
688 .unwrap();
689 provider.commit().unwrap();
690 }
691
692 let provider = factory.database_provider_ro().unwrap();
693
694 let result = rocksdb.check_consistency(&provider).unwrap();
698 assert_eq!(result, Some(0), "sf_tip=0 < checkpoint=100 returns unwind target");
699 }
700
701 #[test]
702 fn test_check_consistency_storages_history_preserves_genesis_entries_at_checkpoint_zero(
703 ) -> eyre::Result<()> {
704 let mut chain_spec = MAINNET.clone();
706 Arc::make_mut(&mut chain_spec).genesis.alloc.first_entry().unwrap().get_mut().storage =
707 Some(From::from([(B256::random(), B256::random())]));
708
709 let factory = create_test_provider_factory_with_chain_spec(chain_spec);
711 let rocksdb = factory.rocksdb_provider();
712 factory.set_storage_settings_cache(StorageSettings::v2());
713
714 let provider_rw = factory.database_provider_rw().unwrap();
716 insert_genesis_history(&provider_rw, factory.chain_spec().genesis.alloc.iter())?;
717 provider_rw.commit()?;
718
719 let provider = factory.database_provider_ro().unwrap();
720
721 let result = rocksdb.heal_storages_history(&provider).unwrap();
723 assert_eq!(result, None, "Should skip healing when only genesis entries present");
724
725 assert!(
727 rocksdb.iter::<tables::StoragesHistory>().unwrap().count() > 0,
728 "Genesis entries should be preserved"
729 );
730
731 Ok(())
732 }
733
734 #[test]
735 fn test_check_consistency_mdbx_behind_checkpoint_needs_unwind() {
736 let temp_dir = TempDir::new().unwrap();
737 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
738
739 let factory = create_test_provider_factory();
740 factory.set_storage_settings_cache(StorageSettings::v2());
741
742 let mut rng = generators::rng();
744 let blocks = generators::random_block_range(
745 &mut rng,
746 0..=2,
747 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
748 );
749
750 {
751 let provider = factory.database_provider_rw().unwrap();
752 let mut tx_count = 0u64;
753 for block in &blocks {
754 provider
755 .insert_block(&block.clone().try_recover().expect("recover block"))
756 .unwrap();
757 for tx in &block.body().transactions {
758 let hash = *tx.tx_hash();
759 rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
760 tx_count += 1;
761 }
762 }
763 provider.commit().unwrap();
764 }
765
766 {
770 let provider = factory.database_provider_rw().unwrap();
771 provider
772 .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(10))
773 .unwrap();
774 provider
776 .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(0))
777 .unwrap();
778 provider
779 .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(0))
780 .unwrap();
781 provider.commit().unwrap();
782 }
783
784 let provider = factory.database_provider_ro().unwrap();
785
786 let result = rocksdb.check_consistency(&provider).unwrap();
788 assert_eq!(result, Some(2), "sf_tip < checkpoint requires unwind to sf_tip");
789 }
790
791 #[test]
792 fn test_check_consistency_rocksdb_ahead_of_checkpoint_prunes_excess() {
793 let temp_dir = TempDir::new().unwrap();
794 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
795
796 let factory = create_test_provider_factory();
798 factory.set_storage_settings_cache(StorageSettings::v2());
799
800 let mut rng = generators::rng();
803 let blocks = generators::random_block_range(
804 &mut rng,
805 0..=5,
806 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
807 );
808
809 let mut tx_hashes = Vec::new();
811 let mut tx_count = 0u64;
812 {
813 let provider = factory.database_provider_rw().unwrap();
814 for block in &blocks {
816 provider
817 .insert_block(&block.clone().try_recover().expect("recover block"))
818 .unwrap();
819 for tx in &block.body().transactions {
820 let hash = *tx.tx_hash();
821 tx_hashes.push(hash);
822 rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
823 tx_count += 1;
824 }
825 }
826 provider.commit().unwrap();
827 }
828
829 {
833 let provider = factory.database_provider_rw().unwrap();
834 let mut cursor = provider.tx_ref().cursor_write::<tables::TransactionBlocks>().unwrap();
838 let mut to_delete = Vec::new();
840 let mut walker = cursor.walk(Some(0)).unwrap();
841 while let Some((tx_num, block_num)) = walker.next().transpose().unwrap() {
842 if block_num > 2 {
843 to_delete.push(tx_num);
844 }
845 }
846 drop(walker);
847 for tx_num in to_delete {
848 cursor.seek_exact(tx_num).unwrap();
849 cursor.delete_current().unwrap();
850 }
851
852 provider
854 .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(2))
855 .unwrap();
856 provider
858 .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(0))
859 .unwrap();
860 provider
861 .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(0))
862 .unwrap();
863 provider.commit().unwrap();
864 }
865
866 let provider = factory.database_provider_ro().unwrap();
867
868 let result = rocksdb.check_consistency(&provider).unwrap();
873 assert_eq!(result, None, "Should heal by pruning, no unwind needed");
874
875 for (i, hash) in tx_hashes.iter().take(6).enumerate() {
878 assert!(
879 rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap().is_some(),
880 "tx {} should remain",
881 i
882 );
883 }
884 for (i, hash) in tx_hashes.iter().skip(6).enumerate() {
886 assert!(
887 rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap().is_none(),
888 "tx {} should be pruned",
889 i + 6
890 );
891 }
892 }
893
894 #[test]
895 fn test_check_consistency_storages_history_sentinel_only_with_checkpoint_is_first_run() {
896 let temp_dir = TempDir::new().unwrap();
897 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
898
899 let key_sentinel_1 = StorageShardedKey::new(Address::ZERO, B256::ZERO, u64::MAX);
902 let key_sentinel_2 = StorageShardedKey::new(Address::random(), B256::random(), u64::MAX);
903 let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
904 rocksdb.put::<tables::StoragesHistory>(key_sentinel_1, &block_list).unwrap();
905 rocksdb.put::<tables::StoragesHistory>(key_sentinel_2, &block_list).unwrap();
906
907 assert!(rocksdb.first::<tables::StoragesHistory>().unwrap().is_some());
909
910 let factory = create_test_provider_factory();
912 factory.set_storage_settings_cache(StorageSettings::v2());
913
914 {
916 let provider = factory.database_provider_rw().unwrap();
917 provider
918 .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
919 .unwrap();
920 provider.commit().unwrap();
921 }
922
923 let provider = factory.database_provider_ro().unwrap();
924
925 let result = rocksdb.check_consistency(&provider).unwrap();
929 assert_eq!(result, Some(0), "sf_tip=0 < checkpoint=100 returns unwind target");
930 }
931
932 #[test]
933 fn test_check_consistency_accounts_history_sentinel_only_with_checkpoint_is_first_run() {
934 use reth_db_api::models::ShardedKey;
935
936 let temp_dir = TempDir::new().unwrap();
937 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
938
939 let key_sentinel_1 = ShardedKey::new(Address::ZERO, u64::MAX);
941 let key_sentinel_2 = ShardedKey::new(Address::random(), u64::MAX);
942 let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
943 rocksdb.put::<tables::AccountsHistory>(key_sentinel_1, &block_list).unwrap();
944 rocksdb.put::<tables::AccountsHistory>(key_sentinel_2, &block_list).unwrap();
945
946 assert!(rocksdb.first::<tables::AccountsHistory>().unwrap().is_some());
948
949 let factory = create_test_provider_factory();
951 factory.set_storage_settings_cache(StorageSettings::v2());
952
953 {
955 let provider = factory.database_provider_rw().unwrap();
956 provider
957 .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
958 .unwrap();
959 provider.commit().unwrap();
960 }
961
962 let provider = factory.database_provider_ro().unwrap();
963
964 let result = rocksdb.check_consistency(&provider).unwrap();
968 assert_eq!(result, Some(0), "sf_tip=0 < checkpoint=100 returns unwind target");
969 }
970
971 #[test]
975 fn test_prune_transaction_hash_numbers_by_range() {
976 let temp_dir = TempDir::new().unwrap();
977 let rocksdb = RocksDBBuilder::new(temp_dir.path())
978 .with_table::<tables::TransactionHashNumbers>()
979 .build()
980 .unwrap();
981
982 let factory = create_test_provider_factory();
984 factory.set_storage_settings_cache(StorageSettings::v2());
985
986 let mut rng = generators::rng();
990 let blocks = generators::random_block_range(
991 &mut rng,
992 0..=5,
993 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
994 );
995
996 let mut tx_count = 0u64;
998 let mut tx_hashes = Vec::new();
999 {
1000 let provider = factory.database_provider_rw().unwrap();
1001
1002 for block in &blocks {
1003 provider
1004 .insert_block(&block.clone().try_recover().expect("recover block"))
1005 .unwrap();
1006
1007 for tx in &block.body().transactions {
1009 let hash = *tx.tx_hash();
1010 tx_hashes.push(hash);
1011 rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
1012 tx_count += 1;
1013 }
1014 }
1015
1016 provider
1019 .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(2))
1020 .unwrap();
1021 provider.commit().unwrap();
1022 }
1023
1024 let rocksdb_count_before: usize =
1031 rocksdb.iter::<tables::TransactionHashNumbers>().unwrap().count();
1032 assert_eq!(
1033 rocksdb_count_before, tx_count as usize,
1034 "RocksDB should have all {} transaction hashes before pruning",
1035 tx_count
1036 );
1037
1038 let provider = factory.database_provider_ro().unwrap();
1039
1040 let all_txs = provider.transactions_by_tx_range(0..tx_count).unwrap();
1042 assert_eq!(all_txs.len(), tx_count as usize, "Should be able to fetch all transactions");
1043
1044 for (i, tx) in all_txs.iter().enumerate() {
1046 let fetched_hash = *tx.tx_hash();
1047 assert_eq!(
1048 fetched_hash, tx_hashes[i],
1049 "Hash mismatch for tx {}: stored {:?} vs fetched {:?}",
1050 i, tx_hashes[i], fetched_hash
1051 );
1052 }
1053
1054 let max_tx_to_keep = 5u64;
1057 let tx_to_prune_start = max_tx_to_keep + 1;
1058
1059 rocksdb
1061 .prune_transaction_hash_numbers_in_range(&provider, tx_to_prune_start..=(tx_count - 1))
1062 .expect("prune should succeed");
1063
1064 let mut remaining_count = 0;
1066 for result in rocksdb.iter::<tables::TransactionHashNumbers>().unwrap() {
1067 let (_hash, tx_num) = result.unwrap();
1068 assert!(
1069 tx_num <= max_tx_to_keep,
1070 "Transaction {} should have been pruned (> {})",
1071 tx_num,
1072 max_tx_to_keep
1073 );
1074 remaining_count += 1;
1075 }
1076 assert_eq!(
1077 remaining_count,
1078 (max_tx_to_keep + 1) as usize,
1079 "Should have {} transactions (0-{})",
1080 max_tx_to_keep + 1,
1081 max_tx_to_keep
1082 );
1083 }
1084
1085 #[test]
1086 fn test_check_consistency_accounts_history_empty_with_checkpoint_is_first_run() {
1087 let temp_dir = TempDir::new().unwrap();
1088 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1089
1090 let factory = create_test_provider_factory();
1092 factory.set_storage_settings_cache(StorageSettings::v2());
1093
1094 {
1096 let provider = factory.database_provider_rw().unwrap();
1097 provider
1098 .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
1099 .unwrap();
1100 provider.commit().unwrap();
1101 }
1102
1103 let provider = factory.database_provider_ro().unwrap();
1104
1105 let result = rocksdb.check_consistency(&provider).unwrap();
1109 assert_eq!(result, Some(0), "sf_tip=0 < checkpoint=100 returns unwind target");
1110 }
1111
1112 #[test]
1113 fn test_check_consistency_accounts_history_preserves_genesis_entries_at_checkpoint_zero(
1114 ) -> eyre::Result<()> {
1115 let factory = create_test_provider_factory();
1117 factory.set_storage_settings_cache(StorageSettings::v2());
1118 let rocksdb = factory.rocksdb_provider();
1119
1120 let provider_rw = factory.database_provider_rw().unwrap();
1122 insert_genesis_history(&provider_rw, factory.chain_spec().genesis.alloc.iter())?;
1123 provider_rw.commit()?;
1124
1125 let provider = factory.database_provider_ro().unwrap();
1126
1127 let result = rocksdb.check_consistency(&provider).unwrap();
1129 assert_eq!(result, None, "Should heal by pruning, no unwind needed");
1130
1131 assert!(
1133 rocksdb.iter::<tables::AccountsHistory>().unwrap().count() > 0,
1134 "Genesis entries should be preserved"
1135 );
1136
1137 Ok(())
1138 }
1139
1140 #[test]
1141 fn test_check_consistency_accounts_history_sf_tip_equals_checkpoint_no_action() {
1142 use reth_db::models::AccountBeforeTx;
1143 use reth_db_api::models::ShardedKey;
1144 use reth_static_file_types::StaticFileSegment;
1145
1146 let temp_dir = TempDir::new().unwrap();
1147 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1148
1149 let key1 = ShardedKey::new(Address::ZERO, 50);
1151 let key2 = ShardedKey::new(Address::random(), 75);
1152 let key3 = ShardedKey::new(Address::random(), u64::MAX); let block_list1 = BlockNumberList::new_pre_sorted([10, 20, 30, 50]);
1154 let block_list2 = BlockNumberList::new_pre_sorted([40, 60, 75]);
1155 let block_list3 = BlockNumberList::new_pre_sorted([80, 90, 100]);
1156 rocksdb.put::<tables::AccountsHistory>(key1, &block_list1).unwrap();
1157 rocksdb.put::<tables::AccountsHistory>(key2, &block_list2).unwrap();
1158 rocksdb.put::<tables::AccountsHistory>(key3, &block_list3).unwrap();
1159
1160 let entries_before: Vec<_> =
1162 rocksdb.iter::<tables::AccountsHistory>().unwrap().map(|r| r.unwrap()).collect();
1163 assert_eq!(entries_before.len(), 3, "Should have 3 entries before check");
1164
1165 let factory = create_test_provider_factory();
1167 factory.set_storage_settings_cache(StorageSettings::v2());
1168
1169 {
1171 let sf_provider = factory.static_file_provider();
1172 let mut writer =
1173 sf_provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
1174
1175 for block_num in 0..=100 {
1176 let changeset = vec![AccountBeforeTx { address: Address::random(), info: None }];
1177 writer.append_account_changeset(changeset, block_num).unwrap();
1178 }
1179
1180 writer.commit().unwrap();
1181 }
1182
1183 {
1185 let provider = factory.database_provider_rw().unwrap();
1186 provider
1187 .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
1188 .unwrap();
1189 provider.commit().unwrap();
1190 }
1191
1192 let provider = factory.database_provider_ro().unwrap();
1193
1194 let sf_tip = provider
1196 .static_file_provider()
1197 .get_highest_static_file_block(StaticFileSegment::AccountChangeSets)
1198 .unwrap();
1199 assert_eq!(sf_tip, 100, "Static file tip should be 100");
1200
1201 let result = rocksdb.check_consistency(&provider).unwrap();
1203 assert_eq!(result, None, "sf_tip == checkpoint should not require unwind");
1204
1205 let entries_after: Vec<_> =
1207 rocksdb.iter::<tables::AccountsHistory>().unwrap().map(|r| r.unwrap()).collect();
1208
1209 assert_eq!(
1210 entries_after.len(),
1211 entries_before.len(),
1212 "RocksDB entry count should be unchanged when sf_tip == checkpoint"
1213 );
1214
1215 for (before, after) in entries_before.iter().zip(entries_after.iter()) {
1217 assert_eq!(before.0.key, after.0.key, "Entry key should be unchanged");
1218 assert_eq!(
1219 before.0.highest_block_number, after.0.highest_block_number,
1220 "Entry highest_block_number should be unchanged"
1221 );
1222 assert_eq!(before.1, after.1, "Entry block list should be unchanged");
1223 }
1224 }
1225
1226 #[test]
1238 fn test_check_consistency_storages_history_heals_via_changesets_large_range() {
1239 use alloy_primitives::U256;
1240 use reth_db_api::models::StorageBeforeTx;
1241
1242 const TOTAL_BLOCKS: u64 = 15_000;
1243 const CHECKPOINT_BLOCK: u64 = 5_000;
1244
1245 let temp_dir = TempDir::new().unwrap();
1246 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1247
1248 let factory = create_test_provider_factory();
1249 factory.set_storage_settings_cache(StorageSettings::v2());
1250
1251 #[inline]
1253 fn make_address(block_num: u64) -> Address {
1254 let mut addr_bytes = [0u8; 20];
1255 addr_bytes[0..8].copy_from_slice(&block_num.to_le_bytes());
1256 Address::from(addr_bytes)
1257 }
1258
1259 #[inline]
1261 fn make_slot(block_num: u64) -> B256 {
1262 let mut slot_bytes = [0u8; 32];
1263 slot_bytes[0..8].copy_from_slice(&block_num.to_le_bytes());
1264 B256::from(slot_bytes)
1265 }
1266
1267 {
1270 let sf_provider = factory.static_file_provider();
1271 let mut writer =
1272 sf_provider.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
1273
1274 let mut changeset = Vec::with_capacity(1);
1276
1277 for block_num in 0..TOTAL_BLOCKS {
1278 changeset.clear();
1279 changeset.push(StorageBeforeTx {
1280 address: make_address(block_num),
1281 key: make_slot(block_num),
1282 value: U256::from(block_num),
1283 });
1284
1285 writer.append_storage_changeset(changeset.clone(), block_num).unwrap();
1286 }
1287
1288 writer.commit().unwrap();
1289 }
1290
1291 {
1293 let sf_provider = factory.static_file_provider();
1294 let highest = sf_provider
1295 .get_highest_static_file_block(StaticFileSegment::StorageChangeSets)
1296 .unwrap();
1297 assert_eq!(highest, TOTAL_BLOCKS - 1, "Static files should have blocks 0..14999");
1298 }
1299
1300 {
1302 let provider = factory.database_provider_rw().unwrap();
1303 provider
1304 .save_stage_checkpoint(
1305 StageId::IndexStorageHistory,
1306 StageCheckpoint::new(CHECKPOINT_BLOCK),
1307 )
1308 .unwrap();
1309 provider.commit().unwrap();
1310 }
1311
1312 for block_num in (CHECKPOINT_BLOCK + 1)..TOTAL_BLOCKS {
1315 let key =
1316 StorageShardedKey::new(make_address(block_num), make_slot(block_num), block_num);
1317 let block_list = BlockNumberList::new_pre_sorted([block_num]);
1318 rocksdb.put::<tables::StoragesHistory>(key, &block_list).unwrap();
1319 }
1320
1321 let count_before: usize = rocksdb.iter::<tables::StoragesHistory>().unwrap().count();
1323 assert_eq!(
1324 count_before,
1325 (TOTAL_BLOCKS - CHECKPOINT_BLOCK - 1) as usize,
1326 "Should have {} stale entries before healing",
1327 TOTAL_BLOCKS - CHECKPOINT_BLOCK - 1
1328 );
1329
1330 let provider = factory.database_provider_ro().unwrap();
1332 let result = rocksdb.check_consistency(&provider).unwrap();
1333 assert_eq!(result, None, "Should heal via changesets, no unwind needed");
1334
1335 let mut remaining_stale = 0;
1338 for result in rocksdb.iter::<tables::StoragesHistory>().unwrap() {
1339 let (key, _) = result.unwrap();
1340 if key.sharded_key.highest_block_number > CHECKPOINT_BLOCK {
1341 remaining_stale += 1;
1342 }
1343 }
1344 assert_eq!(
1345 remaining_stale, 0,
1346 "All stale entries (block > {}) should be pruned",
1347 CHECKPOINT_BLOCK
1348 );
1349 }
1350
1351 #[test]
1355 fn test_check_consistency_storages_history_preserves_checkpoint_block() {
1356 use alloy_primitives::U256;
1357 use reth_db_api::models::StorageBeforeTx;
1358
1359 const CHECKPOINT_BLOCK: u64 = 100;
1360 const SF_TIP: u64 = 200;
1361
1362 let temp_dir = TempDir::new().unwrap();
1363 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1364
1365 let factory = create_test_provider_factory();
1366 factory.set_storage_settings_cache(StorageSettings::v2());
1367
1368 let checkpoint_addr = Address::repeat_byte(0xAA);
1369 let checkpoint_slot = B256::repeat_byte(0xBB);
1370 let stale_addr = Address::repeat_byte(0xCC);
1371 let stale_slot = B256::repeat_byte(0xDD);
1372
1373 {
1375 let sf_provider = factory.static_file_provider();
1376 let mut writer =
1377 sf_provider.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
1378
1379 for block_num in 0..=SF_TIP {
1380 let changeset = if block_num == CHECKPOINT_BLOCK {
1381 vec![StorageBeforeTx {
1382 address: checkpoint_addr,
1383 key: checkpoint_slot,
1384 value: U256::from(block_num),
1385 }]
1386 } else if block_num > CHECKPOINT_BLOCK {
1387 vec![StorageBeforeTx {
1388 address: stale_addr,
1389 key: stale_slot,
1390 value: U256::from(block_num),
1391 }]
1392 } else {
1393 vec![StorageBeforeTx {
1394 address: Address::ZERO,
1395 key: B256::ZERO,
1396 value: U256::ZERO,
1397 }]
1398 };
1399 writer.append_storage_changeset(changeset, block_num).unwrap();
1400 }
1401 writer.commit().unwrap();
1402 }
1403
1404 {
1406 let provider = factory.database_provider_rw().unwrap();
1407 provider
1408 .save_stage_checkpoint(
1409 StageId::IndexStorageHistory,
1410 StageCheckpoint::new(CHECKPOINT_BLOCK),
1411 )
1412 .unwrap();
1413 provider.commit().unwrap();
1414 }
1415
1416 let checkpoint_key =
1418 StorageShardedKey::new(checkpoint_addr, checkpoint_slot, CHECKPOINT_BLOCK);
1419 let checkpoint_list = BlockNumberList::new_pre_sorted([CHECKPOINT_BLOCK]);
1420 rocksdb.put::<tables::StoragesHistory>(checkpoint_key.clone(), &checkpoint_list).unwrap();
1421
1422 let stale_key = StorageShardedKey::new(stale_addr, stale_slot, SF_TIP);
1424 let stale_list = BlockNumberList::new_pre_sorted([CHECKPOINT_BLOCK + 1, SF_TIP]);
1425 rocksdb.put::<tables::StoragesHistory>(stale_key.clone(), &stale_list).unwrap();
1426
1427 let provider = factory.database_provider_ro().unwrap();
1429 let result = rocksdb.check_consistency(&provider).unwrap();
1430 assert_eq!(result, None, "Should heal without unwind");
1431
1432 let preserved = rocksdb.get::<tables::StoragesHistory>(checkpoint_key).unwrap();
1434 assert!(preserved.is_some(), "Entry at checkpoint block should be preserved, not deleted");
1435
1436 let stale = rocksdb.get::<tables::StoragesHistory>(stale_key).unwrap();
1438 assert!(stale.is_none(), "Stale entry after checkpoint should be removed");
1439 }
1440
1441 #[test]
1455 fn test_check_consistency_accounts_history_heals_via_changesets_large_range() {
1456 use reth_db::models::AccountBeforeTx;
1457 use reth_db_api::models::ShardedKey;
1458 use reth_static_file_types::StaticFileSegment;
1459
1460 let temp_dir = TempDir::new().unwrap();
1461 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1462
1463 let factory = create_test_provider_factory();
1465 factory.set_storage_settings_cache(StorageSettings::v2());
1466
1467 const TOTAL_BLOCKS: u64 = 15_000;
1468 const CHECKPOINT_BLOCK: u64 = 5_000;
1469
1470 #[inline]
1472 fn make_address(block_num: u64) -> Address {
1473 let mut addr = Address::ZERO;
1474 addr.0[0..8].copy_from_slice(&block_num.to_le_bytes());
1475 addr
1476 }
1477
1478 {
1480 let sf_provider = factory.static_file_provider();
1481 let mut writer =
1482 sf_provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
1483
1484 let mut changeset = Vec::with_capacity(1);
1486
1487 for block_num in 0..TOTAL_BLOCKS {
1488 changeset.clear();
1489 changeset.push(AccountBeforeTx { address: make_address(block_num), info: None });
1490 writer.append_account_changeset(changeset.clone(), block_num).unwrap();
1491 }
1492
1493 writer.commit().unwrap();
1494 }
1495
1496 for block_num in (CHECKPOINT_BLOCK + 1)..TOTAL_BLOCKS {
1500 let key = ShardedKey::new(make_address(block_num), block_num);
1501 let block_list = BlockNumberList::new_pre_sorted([block_num]);
1502 rocksdb.put::<tables::AccountsHistory>(key, &block_list).unwrap();
1503 }
1504
1505 for block_num in [100u64, 500, 1000, 2500, 5000] {
1507 let key = ShardedKey::new(make_address(block_num), block_num);
1508 let block_list = BlockNumberList::new_pre_sorted([block_num]);
1509 rocksdb.put::<tables::AccountsHistory>(key, &block_list).unwrap();
1510 }
1511
1512 let entries_before: usize = rocksdb.iter::<tables::AccountsHistory>().unwrap().count();
1514 let stale_count = (TOTAL_BLOCKS - CHECKPOINT_BLOCK - 1) as usize;
1515 let valid_count = 5usize;
1516 assert_eq!(
1517 entries_before,
1518 stale_count + valid_count,
1519 "Should have {} stale + {} valid entries before healing",
1520 stale_count,
1521 valid_count
1522 );
1523
1524 {
1526 let provider = factory.database_provider_rw().unwrap();
1527 provider
1528 .save_stage_checkpoint(
1529 StageId::IndexAccountHistory,
1530 StageCheckpoint::new(CHECKPOINT_BLOCK),
1531 )
1532 .unwrap();
1533 provider.commit().unwrap();
1534 }
1535
1536 let provider = factory.database_provider_ro().unwrap();
1537
1538 let sf_tip = provider
1540 .static_file_provider()
1541 .get_highest_static_file_block(StaticFileSegment::AccountChangeSets)
1542 .unwrap();
1543 assert_eq!(sf_tip, TOTAL_BLOCKS - 1, "Static file tip should be 14999");
1544 assert!(sf_tip > CHECKPOINT_BLOCK, "sf_tip should be > checkpoint to trigger healing");
1545
1546 let result = rocksdb.check_consistency(&provider).unwrap();
1548 assert_eq!(result, None, "Healing should succeed without requiring unwind");
1549
1550 let mut remaining_stale = 0;
1553 for result in rocksdb.iter::<tables::AccountsHistory>().unwrap() {
1554 let (key, _) = result.unwrap();
1555 if key.highest_block_number > CHECKPOINT_BLOCK && key.highest_block_number != u64::MAX {
1556 remaining_stale += 1;
1557 }
1558 }
1559 assert_eq!(
1560 remaining_stale, 0,
1561 "All stale entries (block > {}) should be pruned",
1562 CHECKPOINT_BLOCK
1563 );
1564 }
1565
1566 #[test]
1568 fn test_check_consistency_accounts_history_preserves_checkpoint_block() {
1569 use reth_db::models::AccountBeforeTx;
1570 use reth_db_api::models::ShardedKey;
1571
1572 const CHECKPOINT_BLOCK: u64 = 100;
1573 const SF_TIP: u64 = 200;
1574
1575 let temp_dir = TempDir::new().unwrap();
1576 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1577
1578 let factory = create_test_provider_factory();
1579 factory.set_storage_settings_cache(StorageSettings::v2());
1580
1581 let checkpoint_addr = Address::repeat_byte(0xAA);
1582 let stale_addr = Address::repeat_byte(0xCC);
1583
1584 {
1586 let sf_provider = factory.static_file_provider();
1587 let mut writer =
1588 sf_provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
1589
1590 for block_num in 0..=SF_TIP {
1591 let changeset = if block_num == CHECKPOINT_BLOCK {
1592 vec![AccountBeforeTx { address: checkpoint_addr, info: None }]
1593 } else if block_num > CHECKPOINT_BLOCK {
1594 vec![AccountBeforeTx { address: stale_addr, info: None }]
1595 } else {
1596 vec![AccountBeforeTx { address: Address::ZERO, info: None }]
1597 };
1598 writer.append_account_changeset(changeset, block_num).unwrap();
1599 }
1600 writer.commit().unwrap();
1601 }
1602
1603 {
1605 let provider = factory.database_provider_rw().unwrap();
1606 provider
1607 .save_stage_checkpoint(
1608 StageId::IndexAccountHistory,
1609 StageCheckpoint::new(CHECKPOINT_BLOCK),
1610 )
1611 .unwrap();
1612 provider.commit().unwrap();
1613 }
1614
1615 let checkpoint_key = ShardedKey::new(checkpoint_addr, CHECKPOINT_BLOCK);
1617 let checkpoint_list = BlockNumberList::new_pre_sorted([CHECKPOINT_BLOCK]);
1618 rocksdb.put::<tables::AccountsHistory>(checkpoint_key.clone(), &checkpoint_list).unwrap();
1619
1620 let stale_key = ShardedKey::new(stale_addr, SF_TIP);
1622 let stale_list = BlockNumberList::new_pre_sorted([CHECKPOINT_BLOCK + 1, SF_TIP]);
1623 rocksdb.put::<tables::AccountsHistory>(stale_key.clone(), &stale_list).unwrap();
1624
1625 let provider = factory.database_provider_ro().unwrap();
1627 let result = rocksdb.check_consistency(&provider).unwrap();
1628 assert_eq!(result, None, "Should heal without unwind");
1629
1630 let preserved = rocksdb.get::<tables::AccountsHistory>(checkpoint_key).unwrap();
1632 assert!(preserved.is_some(), "Entry at checkpoint block should be preserved, not deleted");
1633
1634 let stale = rocksdb.get::<tables::AccountsHistory>(stale_key).unwrap();
1636 assert!(stale.is_none(), "Stale entry after checkpoint should be removed");
1637 }
1638
1639 #[test]
1640 fn test_check_consistency_storages_history_sf_tip_equals_checkpoint_no_action() {
1641 use alloy_primitives::U256;
1642 use reth_db::models::StorageBeforeTx;
1643 use reth_static_file_types::StaticFileSegment;
1644
1645 let temp_dir = TempDir::new().unwrap();
1646 let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1647
1648 let key1 = StorageShardedKey::new(Address::ZERO, B256::ZERO, 50);
1650 let key2 = StorageShardedKey::new(Address::random(), B256::random(), 80);
1651 let block_list1 = BlockNumberList::new_pre_sorted([10, 20, 30, 50]);
1652 let block_list2 = BlockNumberList::new_pre_sorted([40, 60, 80]);
1653 rocksdb.put::<tables::StoragesHistory>(key1, &block_list1).unwrap();
1654 rocksdb.put::<tables::StoragesHistory>(key2, &block_list2).unwrap();
1655
1656 let entries_before: Vec<_> =
1658 rocksdb.iter::<tables::StoragesHistory>().unwrap().map(|r| r.unwrap()).collect();
1659
1660 let factory = create_test_provider_factory();
1662 factory.set_storage_settings_cache(StorageSettings::v2());
1663
1664 {
1666 let sf_provider = factory.static_file_provider();
1667 let mut writer =
1668 sf_provider.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
1669
1670 for block_num in 0..=100u64 {
1671 let changeset = vec![StorageBeforeTx {
1672 address: Address::ZERO,
1673 key: B256::with_last_byte(block_num as u8),
1674 value: U256::from(block_num),
1675 }];
1676 writer.append_storage_changeset(changeset, block_num).unwrap();
1677 }
1678 writer.commit().unwrap();
1679 }
1680
1681 {
1683 let provider = factory.database_provider_rw().unwrap();
1684 provider
1685 .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
1686 .unwrap();
1687 provider.commit().unwrap();
1688 }
1689
1690 let provider = factory.database_provider_ro().unwrap();
1691
1692 let sf_tip = provider
1694 .static_file_provider()
1695 .get_highest_static_file_block(StaticFileSegment::StorageChangeSets)
1696 .unwrap();
1697 assert_eq!(sf_tip, 100, "Static file tip should be 100");
1698
1699 let result = rocksdb.check_consistency(&provider).unwrap();
1701 assert_eq!(result, None, "sf_tip == checkpoint should not require unwind");
1702
1703 let entries_after: Vec<_> =
1705 rocksdb.iter::<tables::StoragesHistory>().unwrap().map(|r| r.unwrap()).collect();
1706
1707 assert_eq!(
1708 entries_after.len(),
1709 entries_before.len(),
1710 "RocksDB entry count should be unchanged when sf_tip == checkpoint"
1711 );
1712
1713 for (before, after) in entries_before.iter().zip(entries_after.iter()) {
1715 assert_eq!(before.0, after.0, "Entry key should be unchanged");
1716 assert_eq!(before.1, after.1, "Entry block list should be unchanged");
1717 }
1718 }
1719}