Skip to main content

reth_provider/providers/rocksdb/
invariants.rs

1//! Invariant checking for `RocksDB` tables.
2//!
3//! This module provides consistency checks for tables stored in `RocksDB`, similar to the
4//! consistency checks for static files. The goal is to detect and potentially heal
5//! inconsistencies between `RocksDB` data and MDBX checkpoints.
6
7use super::RocksDBProvider;
8use crate::StaticFileProviderFactory;
9use alloy_eips::eip2718::Encodable2718;
10use alloy_primitives::BlockNumber;
11use rayon::prelude::*;
12use reth_chainspec::{ChainSpecProvider, EthChainSpec};
13use reth_db::models::{storage_sharded_key::StorageShardedKey, ShardedKey};
14use reth_db_api::tables;
15use reth_stages_types::StageId;
16use reth_static_file_types::StaticFileSegment;
17use reth_storage_api::{
18    BlockBodyIndicesProvider, ChangeSetReader, DBProvider, StageCheckpointReader,
19    StorageChangeSetReader, StorageSettingsCache, TransactionsProvider,
20};
21use reth_storage_errors::provider::ProviderResult;
22use std::collections::HashSet;
23
24/// Batch size for changeset iteration during history healing.
25/// Balances memory usage against iteration overhead.
26const HEAL_HISTORY_BATCH_SIZE: u64 = 10_000;
27
28impl RocksDBProvider {
29    /// Checks consistency of `RocksDB` tables against MDBX stage checkpoints.
30    ///
31    /// Returns an unwind target block number if the pipeline needs to unwind to rebuild
32    /// `RocksDB` data. Returns `None` if all invariants pass or if inconsistencies were healed.
33    ///
34    /// # Invariants checked
35    ///
36    /// For `TransactionHashNumbers`:
37    /// - The maximum `TxNumber` value should not exceed what the `TransactionLookup` stage
38    ///   checkpoint indicates has been processed.
39    /// - If `RocksDB` is ahead, excess entries are pruned (healed).
40    /// - If `RocksDB` is behind, an unwind is required.
41    ///
42    /// For `StoragesHistory` and `AccountsHistory`:
43    /// - Uses changesets to heal stale entries when static file tip > checkpoint.
44    ///
45    /// # Requirements
46    ///
47    /// For pruning `TransactionHashNumbers`, the provider must be able to supply transaction
48    /// data (typically from static files) so that transaction hashes can be computed. This
49    /// implies that static files should be ahead of or in sync with `RocksDB`.
50    pub fn check_consistency<Provider>(
51        &self,
52        provider: &Provider,
53    ) -> ProviderResult<Option<BlockNumber>>
54    where
55        Provider: DBProvider
56            + StageCheckpointReader
57            + StorageSettingsCache
58            + StaticFileProviderFactory
59            + BlockBodyIndicesProvider
60            + StorageChangeSetReader
61            + ChangeSetReader
62            + TransactionsProvider<Transaction: Encodable2718>
63            + ChainSpecProvider,
64    {
65        let mut unwind_target: Option<BlockNumber> = None;
66
67        // Heal TransactionHashNumbers if stored in RocksDB
68        if provider.cached_storage_settings().storage_v2 &&
69            let Some(target) = self.heal_transaction_hash_numbers(provider)?
70        {
71            unwind_target = Some(unwind_target.map_or(target, |t| t.min(target)));
72        }
73
74        // Heal StoragesHistory if stored in RocksDB
75        if provider.cached_storage_settings().storage_v2 &&
76            let Some(target) = self.heal_storages_history(provider)?
77        {
78            unwind_target = Some(unwind_target.map_or(target, |t| t.min(target)));
79        }
80
81        // Heal AccountsHistory if stored in RocksDB
82        if provider.cached_storage_settings().storage_v2 &&
83            let Some(target) = self.heal_accounts_history(provider)?
84        {
85            unwind_target = Some(unwind_target.map_or(target, |t| t.min(target)));
86        }
87
88        Ok(unwind_target)
89    }
90
91    /// Heals the `TransactionHashNumbers` table.
92    ///
93    /// - Fast path: if checkpoint == 0, clear any stale data and return
94    /// - If `sf_tip` < checkpoint, return unwind target (static files behind)
95    /// - If `sf_tip` == checkpoint, nothing to do
96    /// - If `sf_tip` > checkpoint, heal via transaction ranges in batches
97    fn heal_transaction_hash_numbers<Provider>(
98        &self,
99        provider: &Provider,
100    ) -> ProviderResult<Option<BlockNumber>>
101    where
102        Provider: DBProvider
103            + StageCheckpointReader
104            + StaticFileProviderFactory
105            + BlockBodyIndicesProvider
106            + TransactionsProvider<Transaction: Encodable2718>,
107    {
108        let checkpoint = provider
109            .get_stage_checkpoint(StageId::TransactionLookup)?
110            .map(|cp| cp.block_number)
111            .unwrap_or(0);
112
113        let sf_tip = provider
114            .static_file_provider()
115            .get_highest_static_file_block(StaticFileSegment::Transactions)
116            .unwrap_or(0);
117
118        // Fast path: clear any stale data and return.
119        if checkpoint == 0 {
120            if self.first::<tables::TransactionHashNumbers>()?.is_some() {
121                tracing::info!(
122                    target: "reth::providers::rocksdb",
123                    "TransactionHashNumbers: checkpoint is 0, clearing stale data"
124                );
125                self.clear::<tables::TransactionHashNumbers>()?;
126            }
127
128            return Ok(None);
129        }
130
131        if sf_tip < checkpoint {
132            // This should never happen in normal operation - static files are always committed
133            // before RocksDB. If we get here, something is seriously wrong. The unwind is a
134            // best-effort attempt but is probably futile.
135            tracing::warn!(
136                target: "reth::providers::rocksdb",
137                sf_tip,
138                checkpoint,
139                "TransactionHashNumbers: static file tip behind checkpoint, unwind needed"
140            );
141            return Ok(Some(sf_tip));
142        }
143
144        // sf_tip == checkpoint - nothing to do
145        if sf_tip == checkpoint {
146            return Ok(None);
147        }
148
149        // Get end tx from static files (authoritative for sf_tip)
150        let sf_tip_end_tx = provider
151            .static_file_provider()
152            .get_highest_static_file_tx(StaticFileSegment::Transactions)
153            .unwrap_or(0);
154
155        // Get the first tx after the checkpoint block from MDBX (authoritative up to checkpoint)
156        let checkpoint_next_tx = provider
157            .block_body_indices(checkpoint)?
158            .map(|indices| indices.next_tx_num())
159            .unwrap_or(0);
160
161        if sf_tip_end_tx < checkpoint_next_tx {
162            // This should never happen in normal operation - static files should have all
163            // transactions up to sf_tip. If we get here, something is seriously wrong.
164            // The unwind is a best-effort attempt but is probably futile.
165            tracing::warn!(
166                target: "reth::providers::rocksdb",
167                sf_tip_end_tx,
168                checkpoint_next_tx,
169                checkpoint,
170                sf_tip,
171                "TransactionHashNumbers: static file tx tip behind checkpoint, unwind needed"
172            );
173            return Ok(Some(sf_tip));
174        }
175
176        tracing::info!(
177            target: "reth::providers::rocksdb",
178            checkpoint,
179            sf_tip,
180            checkpoint_next_tx,
181            sf_tip_end_tx,
182            "TransactionHashNumbers: healing via transaction ranges"
183        );
184
185        const BATCH_SIZE: u64 = 10_000;
186        let mut batch_start = checkpoint_next_tx;
187
188        while batch_start <= sf_tip_end_tx {
189            let batch_end = batch_start.saturating_add(BATCH_SIZE - 1).min(sf_tip_end_tx);
190
191            tracing::debug!(
192                target: "reth::providers::rocksdb",
193                batch_start,
194                batch_end,
195                "Pruning TransactionHashNumbers batch"
196            );
197
198            self.prune_transaction_hash_numbers_in_range(provider, batch_start..=batch_end)?;
199
200            batch_start = batch_end.saturating_add(1);
201        }
202
203        Ok(None)
204    }
205
206    /// Prunes `TransactionHashNumbers` entries for transactions in the given range.
207    ///
208    /// This fetches transactions from the provider, computes their hashes in parallel,
209    /// and deletes the corresponding entries from `RocksDB` by key. This approach is more
210    /// scalable than iterating all rows because it only processes the transactions that
211    /// need to be pruned.
212    ///
213    /// # Requirements
214    ///
215    /// The provider must be able to supply transaction data (typically from static files)
216    /// so that transaction hashes can be computed. This implies that static files should
217    /// be ahead of or in sync with `RocksDB`.
218    fn prune_transaction_hash_numbers_in_range<Provider>(
219        &self,
220        provider: &Provider,
221        tx_range: std::ops::RangeInclusive<u64>,
222    ) -> ProviderResult<()>
223    where
224        Provider: TransactionsProvider<Transaction: Encodable2718>,
225    {
226        if tx_range.is_empty() {
227            return Ok(());
228        }
229
230        // Fetch transactions in the range and compute their hashes in parallel
231        let hashes: Vec<_> = provider
232            .transactions_by_tx_range(tx_range.clone())?
233            .into_par_iter()
234            .map(|tx| tx.trie_hash())
235            .collect();
236
237        if !hashes.is_empty() {
238            tracing::info!(
239                target: "reth::providers::rocksdb",
240                deleted_count = hashes.len(),
241                tx_range_start = *tx_range.start(),
242                tx_range_end = *tx_range.end(),
243                "Pruning TransactionHashNumbers entries by tx range"
244            );
245
246            let mut batch = self.batch();
247            for hash in hashes {
248                batch.delete::<tables::TransactionHashNumbers>(hash)?;
249            }
250            batch.commit()?;
251        }
252
253        Ok(())
254    }
255
256    /// Heals the `StoragesHistory` table by removing stale entries.
257    ///
258    /// Returns an unwind target if static file tip is behind checkpoint (cannot heal).
259    /// Otherwise iterates changesets in batches to identify and unwind affected keys.
260    fn heal_storages_history<Provider>(
261        &self,
262        provider: &Provider,
263    ) -> ProviderResult<Option<BlockNumber>>
264    where
265        Provider: DBProvider
266            + StageCheckpointReader
267            + StaticFileProviderFactory
268            + StorageChangeSetReader
269            + ChainSpecProvider,
270    {
271        let checkpoint = provider
272            .get_stage_checkpoint(StageId::IndexStorageHistory)?
273            .map(|cp| cp.block_number)
274            .unwrap_or(0);
275
276        // Fast path: clear and re-insert genesis history.
277        if checkpoint == 0 {
278            tracing::info!(
279                target: "reth::providers::rocksdb",
280                "StoragesHistory: checkpoint is 0, clearing stale data"
281            );
282            self.clear::<tables::StoragesHistory>()?;
283
284            let chain_spec = provider.chain_spec();
285            let genesis = chain_spec.genesis();
286            let list = tables::BlockNumberList::new([0]).expect("single block always fits");
287            for (addr, account) in &genesis.alloc {
288                if let Some(storage) = &account.storage {
289                    for key in storage.keys() {
290                        self.put::<tables::StoragesHistory>(
291                            StorageShardedKey::last(*addr, *key),
292                            &list,
293                        )?;
294                    }
295                }
296            }
297
298            return Ok(None);
299        }
300
301        let sf_tip = provider
302            .static_file_provider()
303            .get_highest_static_file_block(StaticFileSegment::StorageChangeSets)
304            .unwrap_or(0);
305
306        if sf_tip < checkpoint {
307            // This should never happen in normal operation - static files are always
308            // committed before RocksDB. If we get here, something is seriously wrong.
309            // The unwind is a best-effort attempt but is probably futile.
310            tracing::warn!(
311                target: "reth::providers::rocksdb",
312                sf_tip,
313                checkpoint,
314                "StoragesHistory: static file tip behind checkpoint, unwind needed"
315            );
316            return Ok(Some(sf_tip));
317        }
318
319        if sf_tip == checkpoint {
320            return Ok(None);
321        }
322
323        let total_blocks = sf_tip - checkpoint;
324        tracing::info!(
325            target: "reth::providers::rocksdb",
326            checkpoint,
327            sf_tip,
328            total_blocks,
329            "StoragesHistory: healing via changesets"
330        );
331
332        let mut batch_start = checkpoint + 1;
333        let mut batch_num = 0u64;
334        let total_batches = total_blocks.div_ceil(HEAL_HISTORY_BATCH_SIZE);
335
336        while batch_start <= sf_tip {
337            let batch_end = (batch_start + HEAL_HISTORY_BATCH_SIZE - 1).min(sf_tip);
338            batch_num += 1;
339
340            let changesets = provider.storage_changesets_range(batch_start..=batch_end)?;
341
342            let unique_keys: HashSet<_> = changesets
343                .into_iter()
344                .map(|(block_addr, entry)| (block_addr.address(), entry.key, checkpoint + 1))
345                .collect();
346            let indices: Vec<_> = unique_keys.into_iter().collect();
347
348            if !indices.is_empty() {
349                tracing::info!(
350                    target: "reth::providers::rocksdb",
351                    batch_num,
352                    total_batches,
353                    batch_start,
354                    batch_end,
355                    indices_count = indices.len(),
356                    "StoragesHistory: unwinding batch"
357                );
358
359                let batch = self.unwind_storage_history_indices(&indices)?;
360                self.commit_batch(batch)?;
361            }
362
363            batch_start = batch_end + 1;
364        }
365
366        Ok(None)
367    }
368
369    /// Heals the `AccountsHistory` table by removing stale entries.
370    ///
371    /// Returns an unwind target if static file tip is behind checkpoint (cannot heal).
372    /// Otherwise iterates changesets in batches to identify and unwind affected keys.
373    fn heal_accounts_history<Provider>(
374        &self,
375        provider: &Provider,
376    ) -> ProviderResult<Option<BlockNumber>>
377    where
378        Provider: DBProvider
379            + StageCheckpointReader
380            + StaticFileProviderFactory
381            + ChangeSetReader
382            + ChainSpecProvider,
383    {
384        let checkpoint = provider
385            .get_stage_checkpoint(StageId::IndexAccountHistory)?
386            .map(|cp| cp.block_number)
387            .unwrap_or(0);
388
389        // Fast path: clear and re-insert genesis history.
390        if checkpoint == 0 {
391            tracing::info!(
392                target: "reth::providers::rocksdb",
393                "AccountsHistory: checkpoint is 0, clearing stale data"
394            );
395            self.clear::<tables::AccountsHistory>()?;
396
397            let chain_spec = provider.chain_spec();
398            let genesis = chain_spec.genesis();
399            let list = tables::BlockNumberList::new([0]).expect("single block always fits");
400            for addr in genesis.alloc.keys() {
401                self.put::<tables::AccountsHistory>(ShardedKey::last(*addr), &list)?;
402            }
403
404            return Ok(None);
405        }
406
407        let sf_tip = provider
408            .static_file_provider()
409            .get_highest_static_file_block(StaticFileSegment::AccountChangeSets)
410            .unwrap_or(0);
411
412        if sf_tip < checkpoint {
413            // This should never happen in normal operation - static files are always
414            // committed before RocksDB. If we get here, something is seriously wrong.
415            // The unwind is a best-effort attempt but is probably futile.
416            tracing::warn!(
417                target: "reth::providers::rocksdb",
418                sf_tip,
419                checkpoint,
420                "AccountsHistory: static file tip behind checkpoint, unwind needed"
421            );
422            return Ok(Some(sf_tip));
423        }
424
425        if sf_tip == checkpoint {
426            return Ok(None);
427        }
428
429        let total_blocks = sf_tip - checkpoint;
430        tracing::info!(
431            target: "reth::providers::rocksdb",
432            checkpoint,
433            sf_tip,
434            total_blocks,
435            "AccountsHistory: healing via changesets"
436        );
437
438        let mut batch_start = checkpoint + 1;
439        let mut batch_num = 0u64;
440        let total_batches = total_blocks.div_ceil(HEAL_HISTORY_BATCH_SIZE);
441
442        while batch_start <= sf_tip {
443            let batch_end = (batch_start + HEAL_HISTORY_BATCH_SIZE - 1).min(sf_tip);
444            batch_num += 1;
445
446            let changesets = provider.account_changesets_range(batch_start..=batch_end)?;
447
448            let mut addresses = HashSet::with_capacity(changesets.len());
449            addresses.extend(changesets.iter().map(|(_, cs)| cs.address));
450            let unwind_from = checkpoint + 1;
451            let indices: Vec<_> = addresses.into_iter().map(|addr| (addr, unwind_from)).collect();
452
453            if !indices.is_empty() {
454                tracing::info!(
455                    target: "reth::providers::rocksdb",
456                    batch_num,
457                    total_batches,
458                    batch_start,
459                    batch_end,
460                    indices_count = indices.len(),
461                    "AccountsHistory: unwinding batch"
462                );
463
464                let batch = self.unwind_account_history_indices(&indices)?;
465                self.commit_batch(batch)?;
466            }
467
468            batch_start = batch_end + 1;
469        }
470
471        Ok(None)
472    }
473}
474
475#[cfg(test)]
476mod tests {
477    use std::sync::Arc;
478
479    use super::*;
480    use crate::{
481        init::insert_genesis_history,
482        providers::{rocksdb::RocksDBBuilder, static_file::StaticFileWriter},
483        test_utils::{create_test_provider_factory, create_test_provider_factory_with_chain_spec},
484        BlockWriter, DatabaseProviderFactory, RocksDBProviderFactory, StageCheckpointWriter,
485        TransactionsProvider,
486    };
487    use alloy_primitives::{Address, B256};
488    use reth_chainspec::MAINNET;
489    use reth_db::cursor::{DbCursorRO, DbCursorRW};
490    use reth_db_api::{
491        models::{storage_sharded_key::StorageShardedKey, StorageSettings},
492        tables::{self, BlockNumberList},
493        transaction::DbTxMut,
494    };
495    use reth_stages_types::StageCheckpoint;
496    use reth_testing_utils::generators::{self, BlockRangeParams};
497    use tempfile::TempDir;
498
499    #[test]
500    fn test_first_last_empty_rocksdb() {
501        let temp_dir = TempDir::new().unwrap();
502        let provider = RocksDBBuilder::new(temp_dir.path())
503            .with_table::<tables::TransactionHashNumbers>()
504            .with_table::<tables::StoragesHistory>()
505            .build()
506            .unwrap();
507
508        // Empty RocksDB, no checkpoints - should be consistent
509        let first = provider.first::<tables::TransactionHashNumbers>().unwrap();
510        let last = provider.last::<tables::TransactionHashNumbers>().unwrap();
511
512        assert!(first.is_none());
513        assert!(last.is_none());
514    }
515
516    #[test]
517    fn test_first_last_with_data() {
518        let temp_dir = TempDir::new().unwrap();
519        let provider = RocksDBBuilder::new(temp_dir.path())
520            .with_table::<tables::TransactionHashNumbers>()
521            .build()
522            .unwrap();
523
524        // Insert some data
525        let tx_hash = B256::from([1u8; 32]);
526        provider.put::<tables::TransactionHashNumbers>(tx_hash, &100).unwrap();
527
528        // RocksDB has data
529        let last = provider.last::<tables::TransactionHashNumbers>().unwrap();
530        assert!(last.is_some());
531        assert_eq!(last.unwrap().1, 100);
532    }
533
534    #[test]
535    fn test_check_consistency_empty_rocksdb_no_checkpoint_is_ok() {
536        let temp_dir = TempDir::new().unwrap();
537        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
538
539        // Create a test provider factory for MDBX
540        let factory = create_test_provider_factory();
541        factory.set_storage_settings_cache(StorageSettings::v2());
542
543        let provider = factory.database_provider_ro().unwrap();
544
545        // Empty RocksDB and no checkpoints - should be consistent (None = no unwind needed)
546        let result = rocksdb.check_consistency(&provider).unwrap();
547        assert_eq!(result, None);
548    }
549
550    /// Tests that `checkpoint=0` with empty `RocksDB` returns early without attempting
551    /// an expensive healing loop. Previously, when `sf_tip` > `checkpoint=0`, the healer
552    /// would iterate billions of transactions from static files for no effect, causing
553    /// the node to hang on startup with MDBX read transaction timeouts.
554    #[test]
555    fn test_check_consistency_checkpoint_zero_empty_rocksdb_returns_early() {
556        let temp_dir = TempDir::new().unwrap();
557        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
558
559        let factory = create_test_provider_factory();
560        factory.set_storage_settings_cache(StorageSettings::v2());
561
562        // No checkpoints set — all default to 0 via unwrap_or(0).
563        // RocksDB tables are empty.
564        let provider = factory.database_provider_ro().unwrap();
565
566        let result = rocksdb.heal_transaction_hash_numbers(&provider).unwrap();
567        assert_eq!(result, None, "TransactionHashNumbers should return early at checkpoint 0");
568        assert!(rocksdb.first::<tables::TransactionHashNumbers>().unwrap().is_none());
569
570        let result = rocksdb.heal_storages_history(&provider).unwrap();
571        assert_eq!(result, None, "StoragesHistory should return early at checkpoint 0");
572
573        let result = rocksdb.heal_accounts_history(&provider).unwrap();
574        assert_eq!(result, None, "AccountsHistory should return early at checkpoint 0");
575        // Genesis account history entries are re-inserted
576        assert_eq!(
577            rocksdb.iter::<tables::AccountsHistory>().unwrap().count(),
578            factory.chain_spec().genesis().alloc.len()
579        );
580    }
581
582    #[test]
583    fn test_check_consistency_empty_rocksdb_with_checkpoint_is_first_run() {
584        let temp_dir = TempDir::new().unwrap();
585        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
586
587        // Create a test provider factory for MDBX
588        let factory = create_test_provider_factory();
589        factory.set_storage_settings_cache(StorageSettings::v2());
590
591        // Set a checkpoint indicating we should have processed up to block 100
592        {
593            let provider = factory.database_provider_rw().unwrap();
594            provider
595                .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(100))
596                .unwrap();
597            provider.commit().unwrap();
598        }
599
600        let provider = factory.database_provider_ro().unwrap();
601
602        // RocksDB is empty but checkpoint says block 100 was processed.
603        // Since static file tip defaults to 0 when None, and 0 < 100, an unwind is triggered.
604        let result = rocksdb.check_consistency(&provider).unwrap();
605        assert_eq!(result, Some(0), "Static file tip (0) behind checkpoint (100) triggers unwind");
606    }
607
608    /// Tests that when checkpoint=0 and `RocksDB` has data, all entries are pruned.
609    /// This simulates a crash recovery scenario where the checkpoint was lost.
610    #[test]
611    fn test_check_consistency_checkpoint_zero_with_rocksdb_data_prunes_all() {
612        let temp_dir = TempDir::new().unwrap();
613        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
614
615        let factory = create_test_provider_factory();
616        factory.set_storage_settings_cache(StorageSettings::v2());
617
618        // Generate blocks with real transactions and insert them
619        let mut rng = generators::rng();
620        let blocks = generators::random_block_range(
621            &mut rng,
622            0..=2,
623            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
624        );
625
626        let mut tx_hashes = Vec::new();
627        {
628            let provider = factory.database_provider_rw().unwrap();
629            let mut tx_count = 0u64;
630            for block in &blocks {
631                provider
632                    .insert_block(&block.clone().try_recover().expect("recover block"))
633                    .unwrap();
634                for tx in &block.body().transactions {
635                    let hash = tx.trie_hash();
636                    tx_hashes.push(hash);
637                    rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
638                    tx_count += 1;
639                }
640            }
641            provider.commit().unwrap();
642        }
643
644        // Explicitly clear the checkpoints to simulate crash recovery
645        {
646            let provider = factory.database_provider_rw().unwrap();
647            provider
648                .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(0))
649                .unwrap();
650            provider
651                .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(0))
652                .unwrap();
653            provider
654                .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(0))
655                .unwrap();
656            provider.commit().unwrap();
657        }
658
659        // Verify RocksDB data exists
660        assert!(rocksdb.last::<tables::TransactionHashNumbers>().unwrap().is_some());
661
662        let provider = factory.database_provider_ro().unwrap();
663
664        // checkpoint = 0 but RocksDB has data.
665        // This means RocksDB has stale data that should be cleared.
666        let result = rocksdb.check_consistency(&provider).unwrap();
667        assert_eq!(result, None, "Should heal by clearing, no unwind needed");
668
669        // Verify data was cleared
670        for hash in &tx_hashes {
671            assert!(
672                rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap().is_none(),
673                "RocksDB should be empty after pruning"
674            );
675        }
676    }
677
678    #[test]
679    fn test_check_consistency_storages_history_empty_with_checkpoint_is_first_run() {
680        let temp_dir = TempDir::new().unwrap();
681        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
682
683        // Create a test provider factory for MDBX
684        let factory = create_test_provider_factory();
685        factory.set_storage_settings_cache(StorageSettings::v2());
686
687        // Set a checkpoint indicating we should have processed up to block 100
688        {
689            let provider = factory.database_provider_rw().unwrap();
690            provider
691                .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
692                .unwrap();
693            provider.commit().unwrap();
694        }
695
696        let provider = factory.database_provider_ro().unwrap();
697
698        // RocksDB is empty but checkpoint says block 100 was processed.
699        // Since sf_tip=0 < checkpoint=100, we return unwind target of 0.
700        // This should never happen in normal operation.
701        let result = rocksdb.check_consistency(&provider).unwrap();
702        assert_eq!(result, Some(0), "sf_tip=0 < checkpoint=100 returns unwind target");
703    }
704
705    #[test]
706    fn test_check_consistency_storages_history_preserves_genesis_entries_at_checkpoint_zero(
707    ) -> eyre::Result<()> {
708        // Modify mainnet chainspec to include a single genesis storage slot
709        let mut chain_spec = MAINNET.clone();
710        Arc::make_mut(&mut chain_spec).genesis.alloc.first_entry().unwrap().get_mut().storage =
711            Some(From::from([(B256::random(), B256::random())]));
712
713        // Create a test provider factory for MDBX with NO checkpoint
714        let factory = create_test_provider_factory_with_chain_spec(chain_spec);
715        let rocksdb = factory.rocksdb_provider();
716        factory.set_storage_settings_cache(StorageSettings::v2());
717
718        // Insert genesis history into RocksDB
719        let provider_rw = factory.database_provider_rw().unwrap();
720        insert_genesis_history(&provider_rw, factory.chain_spec().genesis.alloc.iter())?;
721        provider_rw.commit()?;
722
723        let provider = factory.database_provider_ro().unwrap();
724
725        // This should not prune anything because only genesis entries are present
726        let result = rocksdb.heal_storages_history(&provider).unwrap();
727        assert_eq!(result, None, "Should skip healing when only genesis entries present");
728
729        // Verify data was NOT deleted
730        assert!(
731            rocksdb.iter::<tables::StoragesHistory>().unwrap().count() > 0,
732            "Genesis entries should be preserved"
733        );
734
735        Ok(())
736    }
737
738    #[test]
739    fn test_check_consistency_mdbx_behind_checkpoint_needs_unwind() {
740        let temp_dir = TempDir::new().unwrap();
741        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
742
743        let factory = create_test_provider_factory();
744        factory.set_storage_settings_cache(StorageSettings::v2());
745
746        // Generate blocks with real transactions (blocks 0-2, 6 transactions total)
747        let mut rng = generators::rng();
748        let blocks = generators::random_block_range(
749            &mut rng,
750            0..=2,
751            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
752        );
753
754        {
755            let provider = factory.database_provider_rw().unwrap();
756            let mut tx_count = 0u64;
757            for block in &blocks {
758                provider
759                    .insert_block(&block.clone().try_recover().expect("recover block"))
760                    .unwrap();
761                for tx in &block.body().transactions {
762                    let hash = tx.trie_hash();
763                    rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
764                    tx_count += 1;
765                }
766            }
767            provider.commit().unwrap();
768        }
769
770        // Set checkpoint to block 10 (beyond our actual data at block 2)
771        // sf_tip is at block 2, checkpoint is at block 10
772        // Since sf_tip < checkpoint, we need to unwind to sf_tip
773        {
774            let provider = factory.database_provider_rw().unwrap();
775            provider
776                .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(10))
777                .unwrap();
778            // Reset history checkpoints so they don't interfere
779            provider
780                .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(0))
781                .unwrap();
782            provider
783                .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(0))
784                .unwrap();
785            provider.commit().unwrap();
786        }
787
788        let provider = factory.database_provider_ro().unwrap();
789
790        // sf_tip (2) < checkpoint (10), so unwind to sf_tip is needed
791        let result = rocksdb.check_consistency(&provider).unwrap();
792        assert_eq!(result, Some(2), "sf_tip < checkpoint requires unwind to sf_tip");
793    }
794
795    #[test]
796    fn test_check_consistency_rocksdb_ahead_of_checkpoint_prunes_excess() {
797        let temp_dir = TempDir::new().unwrap();
798        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
799
800        // Create a test provider factory for MDBX
801        let factory = create_test_provider_factory();
802        factory.set_storage_settings_cache(StorageSettings::v2());
803
804        // Generate blocks with real transactions:
805        // Blocks 0-5, each with 2 transactions = 12 total transactions (0-11)
806        let mut rng = generators::rng();
807        let blocks = generators::random_block_range(
808            &mut rng,
809            0..=5,
810            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
811        );
812
813        // Track which hashes belong to which blocks
814        let mut tx_hashes = Vec::new();
815        let mut tx_count = 0u64;
816        {
817            let provider = factory.database_provider_rw().unwrap();
818            // Insert ALL blocks (0-5) to write transactions to static files
819            for block in &blocks {
820                provider
821                    .insert_block(&block.clone().try_recover().expect("recover block"))
822                    .unwrap();
823                for tx in &block.body().transactions {
824                    let hash = tx.trie_hash();
825                    tx_hashes.push(hash);
826                    rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
827                    tx_count += 1;
828                }
829            }
830            provider.commit().unwrap();
831        }
832
833        // Simulate crash recovery scenario:
834        // MDBX was unwound to block 2, but RocksDB and static files still have more data.
835        // Remove TransactionBlocks entries for blocks 3-5 to simulate MDBX unwind.
836        {
837            let provider = factory.database_provider_rw().unwrap();
838            // Delete TransactionBlocks entries for tx > 5 (i.e., for blocks 3-5)
839            // TransactionBlocks maps last_tx_in_block -> block_number
840            // After unwind, only entries for blocks 0-2 should remain (tx 5 -> block 2)
841            let mut cursor = provider.tx_ref().cursor_write::<tables::TransactionBlocks>().unwrap();
842            // Walk and delete entries where block > 2
843            let mut to_delete = Vec::new();
844            let mut walker = cursor.walk(Some(0)).unwrap();
845            while let Some((tx_num, block_num)) = walker.next().transpose().unwrap() {
846                if block_num > 2 {
847                    to_delete.push(tx_num);
848                }
849            }
850            drop(walker);
851            for tx_num in to_delete {
852                cursor.seek_exact(tx_num).unwrap();
853                cursor.delete_current().unwrap();
854            }
855
856            // Set checkpoint to block 2
857            provider
858                .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(2))
859                .unwrap();
860            // Reset history checkpoints so they don't interfere
861            provider
862                .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(0))
863                .unwrap();
864            provider
865                .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(0))
866                .unwrap();
867            provider.commit().unwrap();
868        }
869
870        let provider = factory.database_provider_ro().unwrap();
871
872        // RocksDB has tx hashes for all blocks (0-5)
873        // MDBX TransactionBlocks only goes up to tx 5 (block 2)
874        // Static files have data for all txs (0-11)
875        // This means RocksDB is ahead and should prune entries for tx 6-11
876        let result = rocksdb.check_consistency(&provider).unwrap();
877        assert_eq!(result, None, "Should heal by pruning, no unwind needed");
878
879        // Verify: hashes for blocks 0-2 (tx 0-5) should remain, blocks 3-5 (tx 6-11) should be
880        // pruned First 6 hashes should remain
881        for (i, hash) in tx_hashes.iter().take(6).enumerate() {
882            assert!(
883                rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap().is_some(),
884                "tx {} should remain",
885                i
886            );
887        }
888        // Last 6 hashes should be pruned
889        for (i, hash) in tx_hashes.iter().skip(6).enumerate() {
890            assert!(
891                rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap().is_none(),
892                "tx {} should be pruned",
893                i + 6
894            );
895        }
896    }
897
898    #[test]
899    fn test_check_consistency_storages_history_sentinel_only_with_checkpoint_is_first_run() {
900        let temp_dir = TempDir::new().unwrap();
901        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
902
903        // Insert ONLY sentinel entries (highest_block_number = u64::MAX)
904        // This simulates a scenario where history tracking started but no shards were completed
905        let key_sentinel_1 = StorageShardedKey::new(Address::ZERO, B256::ZERO, u64::MAX);
906        let key_sentinel_2 = StorageShardedKey::new(Address::random(), B256::random(), u64::MAX);
907        let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
908        rocksdb.put::<tables::StoragesHistory>(key_sentinel_1, &block_list).unwrap();
909        rocksdb.put::<tables::StoragesHistory>(key_sentinel_2, &block_list).unwrap();
910
911        // Verify entries exist (not empty table)
912        assert!(rocksdb.first::<tables::StoragesHistory>().unwrap().is_some());
913
914        // Create a test provider factory for MDBX
915        let factory = create_test_provider_factory();
916        factory.set_storage_settings_cache(StorageSettings::v2());
917
918        // Set a checkpoint indicating we should have processed up to block 100
919        {
920            let provider = factory.database_provider_rw().unwrap();
921            provider
922                .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
923                .unwrap();
924            provider.commit().unwrap();
925        }
926
927        let provider = factory.database_provider_ro().unwrap();
928
929        // RocksDB has only sentinel entries but checkpoint is set.
930        // Since sf_tip=0 < checkpoint=100, we return unwind target of 0.
931        // This should never happen in normal operation.
932        let result = rocksdb.check_consistency(&provider).unwrap();
933        assert_eq!(result, Some(0), "sf_tip=0 < checkpoint=100 returns unwind target");
934    }
935
936    #[test]
937    fn test_check_consistency_accounts_history_sentinel_only_with_checkpoint_is_first_run() {
938        use reth_db_api::models::ShardedKey;
939
940        let temp_dir = TempDir::new().unwrap();
941        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
942
943        // Insert ONLY sentinel entries (highest_block_number = u64::MAX)
944        let key_sentinel_1 = ShardedKey::new(Address::ZERO, u64::MAX);
945        let key_sentinel_2 = ShardedKey::new(Address::random(), u64::MAX);
946        let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
947        rocksdb.put::<tables::AccountsHistory>(key_sentinel_1, &block_list).unwrap();
948        rocksdb.put::<tables::AccountsHistory>(key_sentinel_2, &block_list).unwrap();
949
950        // Verify entries exist (not empty table)
951        assert!(rocksdb.first::<tables::AccountsHistory>().unwrap().is_some());
952
953        // Create a test provider factory for MDBX
954        let factory = create_test_provider_factory();
955        factory.set_storage_settings_cache(StorageSettings::v2());
956
957        // Set a checkpoint indicating we should have processed up to block 100
958        {
959            let provider = factory.database_provider_rw().unwrap();
960            provider
961                .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
962                .unwrap();
963            provider.commit().unwrap();
964        }
965
966        let provider = factory.database_provider_ro().unwrap();
967
968        // RocksDB has only sentinel entries but checkpoint is set.
969        // Since sf_tip=0 < checkpoint=100, we return unwind target of 0.
970        // This should never happen in normal operation.
971        let result = rocksdb.check_consistency(&provider).unwrap();
972        assert_eq!(result, Some(0), "sf_tip=0 < checkpoint=100 returns unwind target");
973    }
974
975    /// Test that pruning works by fetching transactions and computing their hashes,
976    /// rather than iterating all rows. This test uses random blocks with unique
977    /// transactions so we can verify the correct entries are pruned.
978    #[test]
979    fn test_prune_transaction_hash_numbers_by_range() {
980        let temp_dir = TempDir::new().unwrap();
981        let rocksdb = RocksDBBuilder::new(temp_dir.path())
982            .with_table::<tables::TransactionHashNumbers>()
983            .build()
984            .unwrap();
985
986        // Create a test provider factory for MDBX
987        let factory = create_test_provider_factory();
988        factory.set_storage_settings_cache(StorageSettings::v2());
989
990        // Generate random blocks with unique transactions
991        // Block 0 (genesis) has no transactions
992        // Blocks 1-5 each have 2 transactions = 10 transactions total
993        let mut rng = generators::rng();
994        let blocks = generators::random_block_range(
995            &mut rng,
996            0..=5,
997            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
998        );
999
1000        // Insert blocks into the database
1001        let mut tx_count = 0u64;
1002        let mut tx_hashes = Vec::new();
1003        {
1004            let provider = factory.database_provider_rw().unwrap();
1005
1006            for block in &blocks {
1007                provider
1008                    .insert_block(&block.clone().try_recover().expect("recover block"))
1009                    .unwrap();
1010
1011                // Store transaction hash -> tx_number mappings in RocksDB
1012                for tx in &block.body().transactions {
1013                    let hash = tx.trie_hash();
1014                    tx_hashes.push(hash);
1015                    rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
1016                    tx_count += 1;
1017                }
1018            }
1019
1020            // Set checkpoint to block 2 (meaning we should only have tx hashes for blocks 0-2)
1021            // Blocks 0, 1, 2 have 6 transactions (2 each), so tx 0-5 should remain
1022            provider
1023                .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(2))
1024                .unwrap();
1025            provider.commit().unwrap();
1026        }
1027
1028        // At this point:
1029        // - RocksDB has tx hashes for blocks 0-5 (10 total: 2 per block)
1030        // - Checkpoint says we only processed up to block 2
1031        // - We need to prune tx hashes for blocks 3, 4, 5 (tx 6-9)
1032
1033        // Verify RocksDB has the expected number of entries before pruning
1034        let rocksdb_count_before: usize =
1035            rocksdb.iter::<tables::TransactionHashNumbers>().unwrap().count();
1036        assert_eq!(
1037            rocksdb_count_before, tx_count as usize,
1038            "RocksDB should have all {} transaction hashes before pruning",
1039            tx_count
1040        );
1041
1042        let provider = factory.database_provider_ro().unwrap();
1043
1044        // Verify we can fetch transactions by tx range
1045        let all_txs = provider.transactions_by_tx_range(0..tx_count).unwrap();
1046        assert_eq!(all_txs.len(), tx_count as usize, "Should be able to fetch all transactions");
1047
1048        // Verify the hashes match between what we stored and what we compute from fetched txs
1049        for (i, tx) in all_txs.iter().enumerate() {
1050            let computed_hash = tx.trie_hash();
1051            assert_eq!(
1052                computed_hash, tx_hashes[i],
1053                "Hash mismatch for tx {}: stored {:?} vs computed {:?}",
1054                i, tx_hashes[i], computed_hash
1055            );
1056        }
1057
1058        // Blocks 0, 1, 2 have 2 tx each = 6 tx total (indices 0-5)
1059        // We want to keep tx 0-5, prune tx 6-9
1060        let max_tx_to_keep = 5u64;
1061        let tx_to_prune_start = max_tx_to_keep + 1;
1062
1063        // Prune transactions 6-9 (blocks 3-5)
1064        rocksdb
1065            .prune_transaction_hash_numbers_in_range(&provider, tx_to_prune_start..=(tx_count - 1))
1066            .expect("prune should succeed");
1067
1068        // Verify: transactions 0-5 should remain, 6-9 should be pruned
1069        let mut remaining_count = 0;
1070        for result in rocksdb.iter::<tables::TransactionHashNumbers>().unwrap() {
1071            let (_hash, tx_num) = result.unwrap();
1072            assert!(
1073                tx_num <= max_tx_to_keep,
1074                "Transaction {} should have been pruned (> {})",
1075                tx_num,
1076                max_tx_to_keep
1077            );
1078            remaining_count += 1;
1079        }
1080        assert_eq!(
1081            remaining_count,
1082            (max_tx_to_keep + 1) as usize,
1083            "Should have {} transactions (0-{})",
1084            max_tx_to_keep + 1,
1085            max_tx_to_keep
1086        );
1087    }
1088
1089    #[test]
1090    fn test_check_consistency_accounts_history_empty_with_checkpoint_is_first_run() {
1091        let temp_dir = TempDir::new().unwrap();
1092        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1093
1094        // Create a test provider factory for MDBX
1095        let factory = create_test_provider_factory();
1096        factory.set_storage_settings_cache(StorageSettings::v2());
1097
1098        // Set a checkpoint indicating we should have processed up to block 100
1099        {
1100            let provider = factory.database_provider_rw().unwrap();
1101            provider
1102                .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
1103                .unwrap();
1104            provider.commit().unwrap();
1105        }
1106
1107        let provider = factory.database_provider_ro().unwrap();
1108
1109        // RocksDB is empty but checkpoint says block 100 was processed.
1110        // Since sf_tip=0 < checkpoint=100, we return unwind target of 0.
1111        // This should never happen in normal operation.
1112        let result = rocksdb.check_consistency(&provider).unwrap();
1113        assert_eq!(result, Some(0), "sf_tip=0 < checkpoint=100 returns unwind target");
1114    }
1115
1116    #[test]
1117    fn test_check_consistency_accounts_history_preserves_genesis_entries_at_checkpoint_zero(
1118    ) -> eyre::Result<()> {
1119        // Create a test provider factory for MDBX with NO checkpoint
1120        let factory = create_test_provider_factory();
1121        factory.set_storage_settings_cache(StorageSettings::v2());
1122        let rocksdb = factory.rocksdb_provider();
1123
1124        // Insert genesis history into RocksDB
1125        let provider_rw = factory.database_provider_rw().unwrap();
1126        insert_genesis_history(&provider_rw, factory.chain_spec().genesis.alloc.iter())?;
1127        provider_rw.commit()?;
1128
1129        let provider = factory.database_provider_ro().unwrap();
1130
1131        // This should not prune anything because only genesis entries are present
1132        let result = rocksdb.check_consistency(&provider).unwrap();
1133        assert_eq!(result, None, "Should heal by pruning, no unwind needed");
1134
1135        // Verify data was NOT deleted
1136        assert!(
1137            rocksdb.iter::<tables::AccountsHistory>().unwrap().count() > 0,
1138            "Genesis entries should be preserved"
1139        );
1140
1141        Ok(())
1142    }
1143
1144    #[test]
1145    fn test_check_consistency_accounts_history_sf_tip_equals_checkpoint_no_action() {
1146        use reth_db::models::AccountBeforeTx;
1147        use reth_db_api::models::ShardedKey;
1148        use reth_static_file_types::StaticFileSegment;
1149
1150        let temp_dir = TempDir::new().unwrap();
1151        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1152
1153        // Insert some AccountsHistory entries with various highest_block_numbers
1154        let key1 = ShardedKey::new(Address::ZERO, 50);
1155        let key2 = ShardedKey::new(Address::random(), 75);
1156        let key3 = ShardedKey::new(Address::random(), u64::MAX); // sentinel
1157        let block_list1 = BlockNumberList::new_pre_sorted([10, 20, 30, 50]);
1158        let block_list2 = BlockNumberList::new_pre_sorted([40, 60, 75]);
1159        let block_list3 = BlockNumberList::new_pre_sorted([80, 90, 100]);
1160        rocksdb.put::<tables::AccountsHistory>(key1, &block_list1).unwrap();
1161        rocksdb.put::<tables::AccountsHistory>(key2, &block_list2).unwrap();
1162        rocksdb.put::<tables::AccountsHistory>(key3, &block_list3).unwrap();
1163
1164        // Capture RocksDB state before consistency check
1165        let entries_before: Vec<_> =
1166            rocksdb.iter::<tables::AccountsHistory>().unwrap().map(|r| r.unwrap()).collect();
1167        assert_eq!(entries_before.len(), 3, "Should have 3 entries before check");
1168
1169        // Create a test provider factory for MDBX
1170        let factory = create_test_provider_factory();
1171        factory.set_storage_settings_cache(StorageSettings::v2());
1172
1173        // Write account changesets to static files for blocks 0-100
1174        {
1175            let sf_provider = factory.static_file_provider();
1176            let mut writer =
1177                sf_provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
1178
1179            for block_num in 0..=100 {
1180                let changeset = vec![AccountBeforeTx { address: Address::random(), info: None }];
1181                writer.append_account_changeset(changeset, block_num).unwrap();
1182            }
1183
1184            writer.commit().unwrap();
1185        }
1186
1187        // Set IndexAccountHistory checkpoint to block 100 (same as sf_tip)
1188        {
1189            let provider = factory.database_provider_rw().unwrap();
1190            provider
1191                .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
1192                .unwrap();
1193            provider.commit().unwrap();
1194        }
1195
1196        let provider = factory.database_provider_ro().unwrap();
1197
1198        // Verify sf_tip equals checkpoint (both at 100)
1199        let sf_tip = provider
1200            .static_file_provider()
1201            .get_highest_static_file_block(StaticFileSegment::AccountChangeSets)
1202            .unwrap();
1203        assert_eq!(sf_tip, 100, "Static file tip should be 100");
1204
1205        // Run check_consistency - should return None (no unwind needed)
1206        let result = rocksdb.check_consistency(&provider).unwrap();
1207        assert_eq!(result, None, "sf_tip == checkpoint should not require unwind");
1208
1209        // Verify NO entries are deleted - RocksDB state unchanged
1210        let entries_after: Vec<_> =
1211            rocksdb.iter::<tables::AccountsHistory>().unwrap().map(|r| r.unwrap()).collect();
1212
1213        assert_eq!(
1214            entries_after.len(),
1215            entries_before.len(),
1216            "RocksDB entry count should be unchanged when sf_tip == checkpoint"
1217        );
1218
1219        // Verify exact entries are preserved
1220        for (before, after) in entries_before.iter().zip(entries_after.iter()) {
1221            assert_eq!(before.0.key, after.0.key, "Entry key should be unchanged");
1222            assert_eq!(
1223                before.0.highest_block_number, after.0.highest_block_number,
1224                "Entry highest_block_number should be unchanged"
1225            );
1226            assert_eq!(before.1, after.1, "Entry block list should be unchanged");
1227        }
1228    }
1229
1230    /// Tests `StoragesHistory` changeset-based healing with enough blocks to trigger batching.
1231    ///
1232    /// Scenario:
1233    /// 1. Generate 15,000 blocks worth of storage changeset data (to exceed the 10k batch size)
1234    /// 2. Each block has 1 storage change (address + slot + value)
1235    /// 3. Write storage changesets to static files for all 15k blocks
1236    /// 4. Set `IndexStorageHistory` checkpoint to block 5000
1237    /// 5. Insert stale `StoragesHistory` entries in `RocksDB` for (address, slot) pairs that
1238    ///    changed in blocks 5001-15000
1239    /// 6. Run `check_consistency`
1240    /// 7. Verify stale entries for blocks > 5000 are pruned and batching worked
1241    #[test]
1242    fn test_check_consistency_storages_history_heals_via_changesets_large_range() {
1243        use alloy_primitives::U256;
1244        use reth_db_api::models::StorageBeforeTx;
1245
1246        const TOTAL_BLOCKS: u64 = 15_000;
1247        const CHECKPOINT_BLOCK: u64 = 5_000;
1248
1249        let temp_dir = TempDir::new().unwrap();
1250        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1251
1252        let factory = create_test_provider_factory();
1253        factory.set_storage_settings_cache(StorageSettings::v2());
1254
1255        // Helper to generate address from block number (reuses stack arrays)
1256        #[inline]
1257        fn make_address(block_num: u64) -> Address {
1258            let mut addr_bytes = [0u8; 20];
1259            addr_bytes[0..8].copy_from_slice(&block_num.to_le_bytes());
1260            Address::from(addr_bytes)
1261        }
1262
1263        // Helper to generate slot from block number (reuses stack arrays)
1264        #[inline]
1265        fn make_slot(block_num: u64) -> B256 {
1266            let mut slot_bytes = [0u8; 32];
1267            slot_bytes[0..8].copy_from_slice(&block_num.to_le_bytes());
1268            B256::from(slot_bytes)
1269        }
1270
1271        // Write storage changesets to static files for 15k blocks.
1272        // Each block has 1 storage change with a unique (address, slot) pair.
1273        {
1274            let sf_provider = factory.static_file_provider();
1275            let mut writer =
1276                sf_provider.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
1277
1278            // Reuse changeset vec to avoid repeated allocations
1279            let mut changeset = Vec::with_capacity(1);
1280
1281            for block_num in 0..TOTAL_BLOCKS {
1282                changeset.clear();
1283                changeset.push(StorageBeforeTx {
1284                    address: make_address(block_num),
1285                    key: make_slot(block_num),
1286                    value: U256::from(block_num),
1287                });
1288
1289                writer.append_storage_changeset(changeset.clone(), block_num).unwrap();
1290            }
1291
1292            writer.commit().unwrap();
1293        }
1294
1295        // Verify static files have data up to block 14999
1296        {
1297            let sf_provider = factory.static_file_provider();
1298            let highest = sf_provider
1299                .get_highest_static_file_block(StaticFileSegment::StorageChangeSets)
1300                .unwrap();
1301            assert_eq!(highest, TOTAL_BLOCKS - 1, "Static files should have blocks 0..14999");
1302        }
1303
1304        // Set IndexStorageHistory checkpoint to block 5000
1305        {
1306            let provider = factory.database_provider_rw().unwrap();
1307            provider
1308                .save_stage_checkpoint(
1309                    StageId::IndexStorageHistory,
1310                    StageCheckpoint::new(CHECKPOINT_BLOCK),
1311                )
1312                .unwrap();
1313            provider.commit().unwrap();
1314        }
1315
1316        // Insert stale StoragesHistory entries for blocks 5001-14999
1317        // These are (address, slot) pairs that changed after the checkpoint
1318        for block_num in (CHECKPOINT_BLOCK + 1)..TOTAL_BLOCKS {
1319            let key =
1320                StorageShardedKey::new(make_address(block_num), make_slot(block_num), block_num);
1321            let block_list = BlockNumberList::new_pre_sorted([block_num]);
1322            rocksdb.put::<tables::StoragesHistory>(key, &block_list).unwrap();
1323        }
1324
1325        // Verify RocksDB has stale entries before healing
1326        let count_before: usize = rocksdb.iter::<tables::StoragesHistory>().unwrap().count();
1327        assert_eq!(
1328            count_before,
1329            (TOTAL_BLOCKS - CHECKPOINT_BLOCK - 1) as usize,
1330            "Should have {} stale entries before healing",
1331            TOTAL_BLOCKS - CHECKPOINT_BLOCK - 1
1332        );
1333
1334        // Run check_consistency - this should heal by pruning stale entries
1335        let provider = factory.database_provider_ro().unwrap();
1336        let result = rocksdb.check_consistency(&provider).unwrap();
1337        assert_eq!(result, None, "Should heal via changesets, no unwind needed");
1338
1339        // Verify all stale entries were pruned
1340        // After healing, entries with highest_block_number > checkpoint should be gone
1341        let mut remaining_stale = 0;
1342        for result in rocksdb.iter::<tables::StoragesHistory>().unwrap() {
1343            let (key, _) = result.unwrap();
1344            if key.sharded_key.highest_block_number > CHECKPOINT_BLOCK {
1345                remaining_stale += 1;
1346            }
1347        }
1348        assert_eq!(
1349            remaining_stale, 0,
1350            "All stale entries (block > {}) should be pruned",
1351            CHECKPOINT_BLOCK
1352        );
1353    }
1354
1355    /// Tests that healing preserves entries at exactly the checkpoint block.
1356    ///
1357    /// This catches off-by-one bugs where checkpoint block data is incorrectly deleted.
1358    #[test]
1359    fn test_check_consistency_storages_history_preserves_checkpoint_block() {
1360        use alloy_primitives::U256;
1361        use reth_db_api::models::StorageBeforeTx;
1362
1363        const CHECKPOINT_BLOCK: u64 = 100;
1364        const SF_TIP: u64 = 200;
1365
1366        let temp_dir = TempDir::new().unwrap();
1367        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1368
1369        let factory = create_test_provider_factory();
1370        factory.set_storage_settings_cache(StorageSettings::v2());
1371
1372        let checkpoint_addr = Address::repeat_byte(0xAA);
1373        let checkpoint_slot = B256::repeat_byte(0xBB);
1374        let stale_addr = Address::repeat_byte(0xCC);
1375        let stale_slot = B256::repeat_byte(0xDD);
1376
1377        // Write storage changesets to static files
1378        {
1379            let sf_provider = factory.static_file_provider();
1380            let mut writer =
1381                sf_provider.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
1382
1383            for block_num in 0..=SF_TIP {
1384                let changeset = if block_num == CHECKPOINT_BLOCK {
1385                    vec![StorageBeforeTx {
1386                        address: checkpoint_addr,
1387                        key: checkpoint_slot,
1388                        value: U256::from(block_num),
1389                    }]
1390                } else if block_num > CHECKPOINT_BLOCK {
1391                    vec![StorageBeforeTx {
1392                        address: stale_addr,
1393                        key: stale_slot,
1394                        value: U256::from(block_num),
1395                    }]
1396                } else {
1397                    vec![StorageBeforeTx {
1398                        address: Address::ZERO,
1399                        key: B256::ZERO,
1400                        value: U256::ZERO,
1401                    }]
1402                };
1403                writer.append_storage_changeset(changeset, block_num).unwrap();
1404            }
1405            writer.commit().unwrap();
1406        }
1407
1408        // Set checkpoint
1409        {
1410            let provider = factory.database_provider_rw().unwrap();
1411            provider
1412                .save_stage_checkpoint(
1413                    StageId::IndexStorageHistory,
1414                    StageCheckpoint::new(CHECKPOINT_BLOCK),
1415                )
1416                .unwrap();
1417            provider.commit().unwrap();
1418        }
1419
1420        // Insert entry AT the checkpoint block (should be preserved)
1421        let checkpoint_key =
1422            StorageShardedKey::new(checkpoint_addr, checkpoint_slot, CHECKPOINT_BLOCK);
1423        let checkpoint_list = BlockNumberList::new_pre_sorted([CHECKPOINT_BLOCK]);
1424        rocksdb.put::<tables::StoragesHistory>(checkpoint_key.clone(), &checkpoint_list).unwrap();
1425
1426        // Insert stale entry AFTER the checkpoint (should be removed)
1427        let stale_key = StorageShardedKey::new(stale_addr, stale_slot, SF_TIP);
1428        let stale_list = BlockNumberList::new_pre_sorted([CHECKPOINT_BLOCK + 1, SF_TIP]);
1429        rocksdb.put::<tables::StoragesHistory>(stale_key.clone(), &stale_list).unwrap();
1430
1431        // Run healing
1432        let provider = factory.database_provider_ro().unwrap();
1433        let result = rocksdb.check_consistency(&provider).unwrap();
1434        assert_eq!(result, None, "Should heal without unwind");
1435
1436        // Verify checkpoint block entry is PRESERVED
1437        let preserved = rocksdb.get::<tables::StoragesHistory>(checkpoint_key).unwrap();
1438        assert!(preserved.is_some(), "Entry at checkpoint block should be preserved, not deleted");
1439
1440        // Verify stale entry is removed or unwound
1441        let stale = rocksdb.get::<tables::StoragesHistory>(stale_key).unwrap();
1442        assert!(stale.is_none(), "Stale entry after checkpoint should be removed");
1443    }
1444
1445    /// Tests `AccountsHistory` changeset-based healing with enough blocks to trigger batching.
1446    ///
1447    /// Scenario:
1448    /// 1. Generate 15,000 blocks worth of account changeset data (to exceed the 10k batch size)
1449    /// 2. Each block has 1 account change (simple - just random addresses)
1450    /// 3. Write account changesets to static files for all 15k blocks
1451    /// 4. Set `IndexAccountHistory` checkpoint to block 5000
1452    /// 5. Insert stale `AccountsHistory` entries in `RocksDB` for addresses that changed in blocks
1453    ///    5001-15000
1454    /// 6. Run `check_consistency`
1455    /// 7. Verify:
1456    ///    - Stale entries for blocks > 5000 are pruned
1457    ///    - The batching worked (no OOM, completed successfully)
1458    #[test]
1459    fn test_check_consistency_accounts_history_heals_via_changesets_large_range() {
1460        use reth_db::models::AccountBeforeTx;
1461        use reth_db_api::models::ShardedKey;
1462        use reth_static_file_types::StaticFileSegment;
1463
1464        let temp_dir = TempDir::new().unwrap();
1465        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1466
1467        // Create test provider factory
1468        let factory = create_test_provider_factory();
1469        factory.set_storage_settings_cache(StorageSettings::v2());
1470
1471        const TOTAL_BLOCKS: u64 = 15_000;
1472        const CHECKPOINT_BLOCK: u64 = 5_000;
1473
1474        // Helper to generate address from block number (avoids pre-allocating 15k addresses)
1475        #[inline]
1476        fn make_address(block_num: u64) -> Address {
1477            let mut addr = Address::ZERO;
1478            addr.0[0..8].copy_from_slice(&block_num.to_le_bytes());
1479            addr
1480        }
1481
1482        // Write account changesets to static files for all 15k blocks
1483        {
1484            let sf_provider = factory.static_file_provider();
1485            let mut writer =
1486                sf_provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
1487
1488            // Reuse changeset vec to avoid repeated allocations
1489            let mut changeset = Vec::with_capacity(1);
1490
1491            for block_num in 0..TOTAL_BLOCKS {
1492                changeset.clear();
1493                changeset.push(AccountBeforeTx { address: make_address(block_num), info: None });
1494                writer.append_account_changeset(changeset.clone(), block_num).unwrap();
1495            }
1496
1497            writer.commit().unwrap();
1498        }
1499
1500        // Insert stale AccountsHistory entries in RocksDB for addresses that changed
1501        // in blocks 5001-15000 (i.e., blocks after the checkpoint)
1502        // These should be pruned by check_consistency
1503        for block_num in (CHECKPOINT_BLOCK + 1)..TOTAL_BLOCKS {
1504            let key = ShardedKey::new(make_address(block_num), block_num);
1505            let block_list = BlockNumberList::new_pre_sorted([block_num]);
1506            rocksdb.put::<tables::AccountsHistory>(key, &block_list).unwrap();
1507        }
1508
1509        // Also insert some valid entries for blocks <= 5000 that should NOT be pruned
1510        for block_num in [100u64, 500, 1000, 2500, 5000] {
1511            let key = ShardedKey::new(make_address(block_num), block_num);
1512            let block_list = BlockNumberList::new_pre_sorted([block_num]);
1513            rocksdb.put::<tables::AccountsHistory>(key, &block_list).unwrap();
1514        }
1515
1516        // Verify we have entries before healing
1517        let entries_before: usize = rocksdb.iter::<tables::AccountsHistory>().unwrap().count();
1518        let stale_count = (TOTAL_BLOCKS - CHECKPOINT_BLOCK - 1) as usize;
1519        let valid_count = 5usize;
1520        assert_eq!(
1521            entries_before,
1522            stale_count + valid_count,
1523            "Should have {} stale + {} valid entries before healing",
1524            stale_count,
1525            valid_count
1526        );
1527
1528        // Set IndexAccountHistory checkpoint to block 5000
1529        {
1530            let provider = factory.database_provider_rw().unwrap();
1531            provider
1532                .save_stage_checkpoint(
1533                    StageId::IndexAccountHistory,
1534                    StageCheckpoint::new(CHECKPOINT_BLOCK),
1535                )
1536                .unwrap();
1537            provider.commit().unwrap();
1538        }
1539
1540        let provider = factory.database_provider_ro().unwrap();
1541
1542        // Verify sf_tip > checkpoint
1543        let sf_tip = provider
1544            .static_file_provider()
1545            .get_highest_static_file_block(StaticFileSegment::AccountChangeSets)
1546            .unwrap();
1547        assert_eq!(sf_tip, TOTAL_BLOCKS - 1, "Static file tip should be 14999");
1548        assert!(sf_tip > CHECKPOINT_BLOCK, "sf_tip should be > checkpoint to trigger healing");
1549
1550        // Run check_consistency - this should trigger batched changeset-based healing
1551        let result = rocksdb.check_consistency(&provider).unwrap();
1552        assert_eq!(result, None, "Healing should succeed without requiring unwind");
1553
1554        // Verify: all stale entries for blocks > 5000 should be pruned
1555        // Count remaining entries with highest_block_number > checkpoint
1556        let mut remaining_stale = 0;
1557        for result in rocksdb.iter::<tables::AccountsHistory>().unwrap() {
1558            let (key, _) = result.unwrap();
1559            if key.highest_block_number > CHECKPOINT_BLOCK && key.highest_block_number != u64::MAX {
1560                remaining_stale += 1;
1561            }
1562        }
1563        assert_eq!(
1564            remaining_stale, 0,
1565            "All stale entries (block > {}) should be pruned",
1566            CHECKPOINT_BLOCK
1567        );
1568    }
1569
1570    /// Tests that accounts history healing preserves entries at exactly the checkpoint block.
1571    #[test]
1572    fn test_check_consistency_accounts_history_preserves_checkpoint_block() {
1573        use reth_db::models::AccountBeforeTx;
1574        use reth_db_api::models::ShardedKey;
1575
1576        const CHECKPOINT_BLOCK: u64 = 100;
1577        const SF_TIP: u64 = 200;
1578
1579        let temp_dir = TempDir::new().unwrap();
1580        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1581
1582        let factory = create_test_provider_factory();
1583        factory.set_storage_settings_cache(StorageSettings::v2());
1584
1585        let checkpoint_addr = Address::repeat_byte(0xAA);
1586        let stale_addr = Address::repeat_byte(0xCC);
1587
1588        // Write account changesets to static files
1589        {
1590            let sf_provider = factory.static_file_provider();
1591            let mut writer =
1592                sf_provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
1593
1594            for block_num in 0..=SF_TIP {
1595                let changeset = if block_num == CHECKPOINT_BLOCK {
1596                    vec![AccountBeforeTx { address: checkpoint_addr, info: None }]
1597                } else if block_num > CHECKPOINT_BLOCK {
1598                    vec![AccountBeforeTx { address: stale_addr, info: None }]
1599                } else {
1600                    vec![AccountBeforeTx { address: Address::ZERO, info: None }]
1601                };
1602                writer.append_account_changeset(changeset, block_num).unwrap();
1603            }
1604            writer.commit().unwrap();
1605        }
1606
1607        // Set checkpoint
1608        {
1609            let provider = factory.database_provider_rw().unwrap();
1610            provider
1611                .save_stage_checkpoint(
1612                    StageId::IndexAccountHistory,
1613                    StageCheckpoint::new(CHECKPOINT_BLOCK),
1614                )
1615                .unwrap();
1616            provider.commit().unwrap();
1617        }
1618
1619        // Insert entry AT the checkpoint block (should be preserved)
1620        let checkpoint_key = ShardedKey::new(checkpoint_addr, CHECKPOINT_BLOCK);
1621        let checkpoint_list = BlockNumberList::new_pre_sorted([CHECKPOINT_BLOCK]);
1622        rocksdb.put::<tables::AccountsHistory>(checkpoint_key.clone(), &checkpoint_list).unwrap();
1623
1624        // Insert stale entry AFTER the checkpoint (should be removed)
1625        let stale_key = ShardedKey::new(stale_addr, SF_TIP);
1626        let stale_list = BlockNumberList::new_pre_sorted([CHECKPOINT_BLOCK + 1, SF_TIP]);
1627        rocksdb.put::<tables::AccountsHistory>(stale_key.clone(), &stale_list).unwrap();
1628
1629        // Run healing
1630        let provider = factory.database_provider_ro().unwrap();
1631        let result = rocksdb.check_consistency(&provider).unwrap();
1632        assert_eq!(result, None, "Should heal without unwind");
1633
1634        // Verify checkpoint block entry is PRESERVED
1635        let preserved = rocksdb.get::<tables::AccountsHistory>(checkpoint_key).unwrap();
1636        assert!(preserved.is_some(), "Entry at checkpoint block should be preserved, not deleted");
1637
1638        // Verify stale entry is removed or unwound
1639        let stale = rocksdb.get::<tables::AccountsHistory>(stale_key).unwrap();
1640        assert!(stale.is_none(), "Stale entry after checkpoint should be removed");
1641    }
1642
1643    #[test]
1644    fn test_check_consistency_storages_history_sf_tip_equals_checkpoint_no_action() {
1645        use alloy_primitives::U256;
1646        use reth_db::models::StorageBeforeTx;
1647        use reth_static_file_types::StaticFileSegment;
1648
1649        let temp_dir = TempDir::new().unwrap();
1650        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1651
1652        // Insert StoragesHistory entries into RocksDB
1653        let key1 = StorageShardedKey::new(Address::ZERO, B256::ZERO, 50);
1654        let key2 = StorageShardedKey::new(Address::random(), B256::random(), 80);
1655        let block_list1 = BlockNumberList::new_pre_sorted([10, 20, 30, 50]);
1656        let block_list2 = BlockNumberList::new_pre_sorted([40, 60, 80]);
1657        rocksdb.put::<tables::StoragesHistory>(key1, &block_list1).unwrap();
1658        rocksdb.put::<tables::StoragesHistory>(key2, &block_list2).unwrap();
1659
1660        // Capture entries before consistency check
1661        let entries_before: Vec<_> =
1662            rocksdb.iter::<tables::StoragesHistory>().unwrap().map(|r| r.unwrap()).collect();
1663
1664        // Create a test provider factory
1665        let factory = create_test_provider_factory();
1666        factory.set_storage_settings_cache(StorageSettings::v2());
1667
1668        // Write storage changesets to static files for blocks 0-100
1669        {
1670            let sf_provider = factory.static_file_provider();
1671            let mut writer =
1672                sf_provider.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
1673
1674            for block_num in 0..=100u64 {
1675                let changeset = vec![StorageBeforeTx {
1676                    address: Address::ZERO,
1677                    key: B256::with_last_byte(block_num as u8),
1678                    value: U256::from(block_num),
1679                }];
1680                writer.append_storage_changeset(changeset, block_num).unwrap();
1681            }
1682            writer.commit().unwrap();
1683        }
1684
1685        // Set IndexStorageHistory checkpoint to block 100 (same as sf_tip)
1686        {
1687            let provider = factory.database_provider_rw().unwrap();
1688            provider
1689                .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
1690                .unwrap();
1691            provider.commit().unwrap();
1692        }
1693
1694        let provider = factory.database_provider_ro().unwrap();
1695
1696        // Verify sf_tip equals checkpoint (both at 100)
1697        let sf_tip = provider
1698            .static_file_provider()
1699            .get_highest_static_file_block(StaticFileSegment::StorageChangeSets)
1700            .unwrap();
1701        assert_eq!(sf_tip, 100, "Static file tip should be 100");
1702
1703        // Run check_consistency - should return None (no unwind needed)
1704        let result = rocksdb.check_consistency(&provider).unwrap();
1705        assert_eq!(result, None, "sf_tip == checkpoint should not require unwind");
1706
1707        // Verify NO entries are deleted - RocksDB state unchanged
1708        let entries_after: Vec<_> =
1709            rocksdb.iter::<tables::StoragesHistory>().unwrap().map(|r| r.unwrap()).collect();
1710
1711        assert_eq!(
1712            entries_after.len(),
1713            entries_before.len(),
1714            "RocksDB entry count should be unchanged when sf_tip == checkpoint"
1715        );
1716
1717        // Verify exact entries are preserved
1718        for (before, after) in entries_before.iter().zip(entries_after.iter()) {
1719            assert_eq!(before.0, after.0, "Entry key should be unchanged");
1720            assert_eq!(before.1, after.1, "Entry block list should be unchanged");
1721        }
1722    }
1723}