Skip to main content

reth_provider/providers/rocksdb/
invariants.rs

1//! Invariant checking for `RocksDB` tables.
2//!
3//! This module provides consistency checks for tables stored in `RocksDB`, similar to the
4//! consistency checks for static files. The goal is to detect and potentially heal
5//! inconsistencies between `RocksDB` data and MDBX checkpoints.
6
7use super::RocksDBProvider;
8use crate::StaticFileProviderFactory;
9use alloy_consensus::transaction::TxHashRef;
10use alloy_primitives::BlockNumber;
11use rayon::prelude::*;
12use reth_chainspec::{ChainSpecProvider, EthChainSpec};
13use reth_db::models::{storage_sharded_key::StorageShardedKey, ShardedKey};
14use reth_db_api::tables;
15use reth_stages_types::StageId;
16use reth_static_file_types::StaticFileSegment;
17use reth_storage_api::{
18    BlockBodyIndicesProvider, ChangeSetReader, DBProvider, StageCheckpointReader,
19    StorageChangeSetReader, StorageSettingsCache, TransactionsProvider,
20};
21use reth_storage_errors::provider::ProviderResult;
22use std::collections::HashSet;
23
24/// Batch size for changeset iteration during history healing.
25/// Balances memory usage against iteration overhead.
26const HEAL_HISTORY_BATCH_SIZE: u64 = 10_000;
27
28impl RocksDBProvider {
29    /// Checks consistency of `RocksDB` tables against MDBX stage checkpoints.
30    ///
31    /// Returns an unwind target block number if the pipeline needs to unwind to rebuild
32    /// `RocksDB` data. Returns `None` if all invariants pass or if inconsistencies were healed.
33    ///
34    /// # Invariants checked
35    ///
36    /// For `TransactionHashNumbers`:
37    /// - The maximum `TxNumber` value should not exceed what the `TransactionLookup` stage
38    ///   checkpoint indicates has been processed.
39    /// - If `RocksDB` is ahead, excess entries are pruned (healed).
40    /// - If `RocksDB` is behind, an unwind is required.
41    ///
42    /// For `StoragesHistory` and `AccountsHistory`:
43    /// - Uses changesets to heal stale entries when static file tip > checkpoint.
44    ///
45    /// # Requirements
46    ///
47    /// For pruning `TransactionHashNumbers`, the provider must be able to supply transaction
48    /// data (typically from static files) so that transaction hashes can be read. This
49    /// implies that static files should be ahead of or in sync with `RocksDB`.
50    pub fn check_consistency<Provider>(
51        &self,
52        provider: &Provider,
53    ) -> ProviderResult<Option<BlockNumber>>
54    where
55        Provider: DBProvider
56            + StageCheckpointReader
57            + StorageSettingsCache
58            + StaticFileProviderFactory
59            + BlockBodyIndicesProvider
60            + StorageChangeSetReader
61            + ChangeSetReader
62            + TransactionsProvider
63            + ChainSpecProvider,
64    {
65        let mut unwind_target: Option<BlockNumber> = None;
66
67        // Heal TransactionHashNumbers if stored in RocksDB
68        if provider.cached_storage_settings().storage_v2 &&
69            let Some(target) = self.heal_transaction_hash_numbers(provider)?
70        {
71            unwind_target = Some(unwind_target.map_or(target, |t| t.min(target)));
72        }
73
74        // Heal StoragesHistory if stored in RocksDB
75        if provider.cached_storage_settings().storage_v2 &&
76            let Some(target) = self.heal_storages_history(provider)?
77        {
78            unwind_target = Some(unwind_target.map_or(target, |t| t.min(target)));
79        }
80
81        // Heal AccountsHistory if stored in RocksDB
82        if provider.cached_storage_settings().storage_v2 &&
83            let Some(target) = self.heal_accounts_history(provider)?
84        {
85            unwind_target = Some(unwind_target.map_or(target, |t| t.min(target)));
86        }
87
88        Ok(unwind_target)
89    }
90
91    /// Heals the `TransactionHashNumbers` table.
92    ///
93    /// - Fast path: if checkpoint == 0, clear any stale data and return
94    /// - If `sf_tip` < checkpoint, return unwind target (static files behind)
95    /// - If `sf_tip` == checkpoint, nothing to do
96    /// - If `sf_tip` > checkpoint, heal via transaction ranges in batches
97    fn heal_transaction_hash_numbers<Provider>(
98        &self,
99        provider: &Provider,
100    ) -> ProviderResult<Option<BlockNumber>>
101    where
102        Provider: DBProvider
103            + StageCheckpointReader
104            + StaticFileProviderFactory
105            + BlockBodyIndicesProvider
106            + TransactionsProvider,
107    {
108        let checkpoint = provider
109            .get_stage_checkpoint(StageId::TransactionLookup)?
110            .map(|cp| cp.block_number)
111            .unwrap_or(0);
112
113        let sf_tip = provider
114            .static_file_provider()
115            .get_highest_static_file_block(StaticFileSegment::Transactions)
116            .unwrap_or(0);
117
118        // Fast path: clear any stale data and return.
119        if checkpoint == 0 {
120            if self.first::<tables::TransactionHashNumbers>()?.is_some() {
121                tracing::info!(
122                    target: "reth::providers::rocksdb",
123                    "TransactionHashNumbers: checkpoint is 0, clearing stale data"
124                );
125                self.clear::<tables::TransactionHashNumbers>()?;
126            }
127
128            return Ok(None);
129        }
130
131        if sf_tip < checkpoint {
132            // This should never happen in normal operation - static files are always committed
133            // before RocksDB. If we get here, something is seriously wrong. The unwind is a
134            // best-effort attempt but is probably futile.
135            tracing::warn!(
136                target: "reth::providers::rocksdb",
137                sf_tip,
138                checkpoint,
139                "TransactionHashNumbers: static file tip behind checkpoint, unwind needed"
140            );
141            return Ok(Some(sf_tip));
142        }
143
144        // sf_tip == checkpoint - nothing to do
145        if sf_tip == checkpoint {
146            return Ok(None);
147        }
148
149        // Get end tx from static files (authoritative for sf_tip)
150        let sf_tip_end_tx = provider
151            .static_file_provider()
152            .get_highest_static_file_tx(StaticFileSegment::Transactions)
153            .unwrap_or(0);
154
155        // Get the first tx after the checkpoint block from MDBX (authoritative up to checkpoint)
156        let checkpoint_next_tx = provider
157            .block_body_indices(checkpoint)?
158            .map(|indices| indices.next_tx_num())
159            .unwrap_or(0);
160
161        if sf_tip_end_tx < checkpoint_next_tx {
162            // This should never happen in normal operation - static files should have all
163            // transactions up to sf_tip. If we get here, something is seriously wrong.
164            // The unwind is a best-effort attempt but is probably futile.
165            tracing::warn!(
166                target: "reth::providers::rocksdb",
167                sf_tip_end_tx,
168                checkpoint_next_tx,
169                checkpoint,
170                sf_tip,
171                "TransactionHashNumbers: static file tx tip behind checkpoint, unwind needed"
172            );
173            return Ok(Some(sf_tip));
174        }
175
176        tracing::info!(
177            target: "reth::providers::rocksdb",
178            checkpoint,
179            sf_tip,
180            checkpoint_next_tx,
181            sf_tip_end_tx,
182            "TransactionHashNumbers: healing via transaction ranges"
183        );
184
185        const BATCH_SIZE: u64 = 10_000;
186        let mut batch_start = checkpoint_next_tx;
187
188        while batch_start <= sf_tip_end_tx {
189            let batch_end = batch_start.saturating_add(BATCH_SIZE - 1).min(sf_tip_end_tx);
190
191            tracing::debug!(
192                target: "reth::providers::rocksdb",
193                batch_start,
194                batch_end,
195                "Pruning TransactionHashNumbers batch"
196            );
197
198            self.prune_transaction_hash_numbers_in_range(provider, batch_start..=batch_end)?;
199
200            batch_start = batch_end.saturating_add(1);
201        }
202
203        Ok(None)
204    }
205
206    /// Prunes `TransactionHashNumbers` entries for transactions in the given range.
207    ///
208    /// This fetches transactions from the provider, reads their hashes in parallel,
209    /// and deletes the corresponding entries from `RocksDB` by key. This approach is more
210    /// scalable than iterating all rows because it only processes the transactions that
211    /// need to be pruned.
212    ///
213    /// # Requirements
214    ///
215    /// The provider must be able to supply transaction data (typically from static files)
216    /// so that transaction hashes can be read. This implies that static files should
217    /// be ahead of or in sync with `RocksDB`.
218    fn prune_transaction_hash_numbers_in_range<Provider>(
219        &self,
220        provider: &Provider,
221        tx_range: std::ops::RangeInclusive<u64>,
222    ) -> ProviderResult<()>
223    where
224        Provider: TransactionsProvider,
225    {
226        if tx_range.is_empty() {
227            return Ok(());
228        }
229
230        // Fetch transactions in the range and read their hashes in parallel.
231        let hashes: Vec<_> = provider
232            .transactions_by_tx_range(tx_range.clone())?
233            .into_par_iter()
234            .map(|tx| *tx.tx_hash())
235            .collect();
236
237        if !hashes.is_empty() {
238            tracing::info!(
239                target: "reth::providers::rocksdb",
240                deleted_count = hashes.len(),
241                tx_range_start = *tx_range.start(),
242                tx_range_end = *tx_range.end(),
243                "Pruning TransactionHashNumbers entries by tx range"
244            );
245
246            let mut batch = self.batch();
247            for hash in hashes {
248                batch.delete::<tables::TransactionHashNumbers>(hash)?;
249            }
250            batch.commit()?;
251        }
252
253        Ok(())
254    }
255
256    /// Heals the `StoragesHistory` table by removing stale entries.
257    ///
258    /// Returns an unwind target if static file tip is behind checkpoint (cannot heal).
259    /// Otherwise iterates changesets in batches to identify and unwind affected keys.
260    fn heal_storages_history<Provider>(
261        &self,
262        provider: &Provider,
263    ) -> ProviderResult<Option<BlockNumber>>
264    where
265        Provider: DBProvider
266            + StageCheckpointReader
267            + StaticFileProviderFactory
268            + StorageChangeSetReader
269            + ChainSpecProvider,
270    {
271        let checkpoint = provider
272            .get_stage_checkpoint(StageId::IndexStorageHistory)?
273            .map(|cp| cp.block_number)
274            .unwrap_or(0);
275
276        let sf_tip = provider
277            .static_file_provider()
278            .get_highest_static_file_block(StaticFileSegment::StorageChangeSets)
279            .unwrap_or(0);
280
281        if sf_tip < checkpoint {
282            // This should never happen in normal operation - static files are always
283            // committed before RocksDB. If we get here, something is seriously wrong.
284            // The unwind is a best-effort attempt but is probably futile.
285            tracing::warn!(
286                target: "reth::providers::rocksdb",
287                sf_tip,
288                checkpoint,
289                "StoragesHistory: static file tip behind checkpoint, unwind needed"
290            );
291            return Ok(Some(sf_tip));
292        }
293
294        if sf_tip == checkpoint {
295            return Ok(None);
296        }
297
298        // Fast path: clear and re-insert genesis history.
299        if checkpoint == 0 {
300            tracing::info!(
301                target: "reth::providers::rocksdb",
302                "StoragesHistory: checkpoint is 0, clearing stale data"
303            );
304            self.clear::<tables::StoragesHistory>()?;
305
306            let chain_spec = provider.chain_spec();
307            let genesis = chain_spec.genesis();
308            let list = tables::BlockNumberList::new([0]).expect("single block always fits");
309            for (addr, account) in &genesis.alloc {
310                if let Some(storage) = &account.storage {
311                    for key in storage.keys() {
312                        self.put::<tables::StoragesHistory>(
313                            StorageShardedKey::last(*addr, *key),
314                            &list,
315                        )?;
316                    }
317                }
318            }
319
320            return Ok(None);
321        }
322
323        let total_blocks = sf_tip - checkpoint;
324        tracing::info!(
325            target: "reth::providers::rocksdb",
326            checkpoint,
327            sf_tip,
328            total_blocks,
329            "StoragesHistory: healing via changesets"
330        );
331
332        let mut batch_start = checkpoint + 1;
333        let mut batch_num = 0u64;
334        let total_batches = total_blocks.div_ceil(HEAL_HISTORY_BATCH_SIZE);
335
336        while batch_start <= sf_tip {
337            let batch_end = (batch_start + HEAL_HISTORY_BATCH_SIZE - 1).min(sf_tip);
338            batch_num += 1;
339
340            let changesets = provider.storage_changesets_range(batch_start..=batch_end)?;
341
342            let unique_keys: HashSet<_> = changesets
343                .into_iter()
344                .map(|(block_addr, entry)| (block_addr.address(), entry.key, checkpoint + 1))
345                .collect();
346            let indices: Vec<_> = unique_keys.into_iter().collect();
347
348            if !indices.is_empty() {
349                tracing::info!(
350                    target: "reth::providers::rocksdb",
351                    batch_num,
352                    total_batches,
353                    batch_start,
354                    batch_end,
355                    indices_count = indices.len(),
356                    "StoragesHistory: unwinding batch"
357                );
358
359                let batch = self.unwind_storage_history_indices(&indices)?;
360                self.commit_batch(batch)?;
361            }
362
363            batch_start = batch_end + 1;
364        }
365
366        Ok(None)
367    }
368
369    /// Heals the `AccountsHistory` table by removing stale entries.
370    ///
371    /// Returns an unwind target if static file tip is behind checkpoint (cannot heal).
372    /// Otherwise iterates changesets in batches to identify and unwind affected keys.
373    fn heal_accounts_history<Provider>(
374        &self,
375        provider: &Provider,
376    ) -> ProviderResult<Option<BlockNumber>>
377    where
378        Provider: DBProvider
379            + StageCheckpointReader
380            + StaticFileProviderFactory
381            + ChangeSetReader
382            + ChainSpecProvider,
383    {
384        let checkpoint = provider
385            .get_stage_checkpoint(StageId::IndexAccountHistory)?
386            .map(|cp| cp.block_number)
387            .unwrap_or(0);
388
389        let sf_tip = provider
390            .static_file_provider()
391            .get_highest_static_file_block(StaticFileSegment::AccountChangeSets)
392            .unwrap_or(0);
393
394        if sf_tip < checkpoint {
395            // This should never happen in normal operation - static files are always
396            // committed before RocksDB. If we get here, something is seriously wrong.
397            // The unwind is a best-effort attempt but is probably futile.
398            tracing::warn!(
399                target: "reth::providers::rocksdb",
400                sf_tip,
401                checkpoint,
402                "AccountsHistory: static file tip behind checkpoint, unwind needed"
403            );
404            return Ok(Some(sf_tip));
405        }
406
407        if sf_tip == checkpoint {
408            return Ok(None);
409        }
410
411        // Fast path: clear and re-insert genesis history.
412        if checkpoint == 0 {
413            tracing::info!(
414                target: "reth::providers::rocksdb",
415                "AccountsHistory: checkpoint is 0, clearing stale data"
416            );
417            self.clear::<tables::AccountsHistory>()?;
418
419            let chain_spec = provider.chain_spec();
420            let genesis = chain_spec.genesis();
421            let list = tables::BlockNumberList::new([0]).expect("single block always fits");
422            for addr in genesis.alloc.keys() {
423                self.put::<tables::AccountsHistory>(ShardedKey::last(*addr), &list)?;
424            }
425
426            return Ok(None);
427        }
428
429        let total_blocks = sf_tip - checkpoint;
430        tracing::info!(
431            target: "reth::providers::rocksdb",
432            checkpoint,
433            sf_tip,
434            total_blocks,
435            "AccountsHistory: healing via changesets"
436        );
437
438        let mut batch_start = checkpoint + 1;
439        let mut batch_num = 0u64;
440        let total_batches = total_blocks.div_ceil(HEAL_HISTORY_BATCH_SIZE);
441
442        while batch_start <= sf_tip {
443            let batch_end = (batch_start + HEAL_HISTORY_BATCH_SIZE - 1).min(sf_tip);
444            batch_num += 1;
445
446            let changesets = provider.account_changesets_range(batch_start..=batch_end)?;
447
448            let mut addresses = HashSet::with_capacity(changesets.len());
449            addresses.extend(changesets.iter().map(|(_, cs)| cs.address));
450            let unwind_from = checkpoint + 1;
451            let indices: Vec<_> = addresses.into_iter().map(|addr| (addr, unwind_from)).collect();
452
453            if !indices.is_empty() {
454                tracing::info!(
455                    target: "reth::providers::rocksdb",
456                    batch_num,
457                    total_batches,
458                    batch_start,
459                    batch_end,
460                    indices_count = indices.len(),
461                    "AccountsHistory: unwinding batch"
462                );
463
464                let batch = self.unwind_account_history_indices(&indices)?;
465                self.commit_batch(batch)?;
466            }
467
468            batch_start = batch_end + 1;
469        }
470
471        Ok(None)
472    }
473}
474
475#[cfg(test)]
476mod tests {
477    use std::sync::Arc;
478
479    use super::*;
480    use crate::{
481        init::insert_genesis_history,
482        providers::{rocksdb::RocksDBBuilder, static_file::StaticFileWriter},
483        test_utils::{create_test_provider_factory, create_test_provider_factory_with_chain_spec},
484        BlockWriter, DatabaseProviderFactory, RocksDBProviderFactory, StageCheckpointWriter,
485        TransactionsProvider,
486    };
487    use alloy_primitives::{Address, B256};
488    use reth_chainspec::MAINNET;
489    use reth_db::cursor::{DbCursorRO, DbCursorRW};
490    use reth_db_api::{
491        models::{storage_sharded_key::StorageShardedKey, StorageSettings},
492        tables::{self, BlockNumberList},
493        transaction::DbTxMut,
494    };
495    use reth_stages_types::StageCheckpoint;
496    use reth_testing_utils::generators::{self, BlockRangeParams};
497    use tempfile::TempDir;
498
499    #[test]
500    fn test_first_last_empty_rocksdb() {
501        let temp_dir = TempDir::new().unwrap();
502        let provider = RocksDBBuilder::new(temp_dir.path())
503            .with_table::<tables::TransactionHashNumbers>()
504            .with_table::<tables::StoragesHistory>()
505            .build()
506            .unwrap();
507
508        // Empty RocksDB, no checkpoints - should be consistent
509        let first = provider.first::<tables::TransactionHashNumbers>().unwrap();
510        let last = provider.last::<tables::TransactionHashNumbers>().unwrap();
511
512        assert!(first.is_none());
513        assert!(last.is_none());
514    }
515
516    #[test]
517    fn test_first_last_with_data() {
518        let temp_dir = TempDir::new().unwrap();
519        let provider = RocksDBBuilder::new(temp_dir.path())
520            .with_table::<tables::TransactionHashNumbers>()
521            .build()
522            .unwrap();
523
524        // Insert some data
525        let tx_hash = B256::from([1u8; 32]);
526        provider.put::<tables::TransactionHashNumbers>(tx_hash, &100).unwrap();
527
528        // RocksDB has data
529        let last = provider.last::<tables::TransactionHashNumbers>().unwrap();
530        assert!(last.is_some());
531        assert_eq!(last.unwrap().1, 100);
532    }
533
534    #[test]
535    fn test_check_consistency_empty_rocksdb_no_checkpoint_is_ok() {
536        let temp_dir = TempDir::new().unwrap();
537        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
538
539        // Create a test provider factory for MDBX
540        let factory = create_test_provider_factory();
541        factory.set_storage_settings_cache(StorageSettings::v2());
542
543        let provider = factory.database_provider_ro().unwrap();
544
545        // Empty RocksDB and no checkpoints - should be consistent (None = no unwind needed)
546        let result = rocksdb.check_consistency(&provider).unwrap();
547        assert_eq!(result, None);
548    }
549
550    /// Tests that `checkpoint=0` with empty `RocksDB` returns early without attempting
551    /// an expensive healing loop. Previously, when `sf_tip` > `checkpoint=0`, the healer
552    /// would iterate billions of transactions from static files for no effect, causing
553    /// the node to hang on startup with MDBX read transaction timeouts.
554    #[test]
555    fn test_check_consistency_checkpoint_zero_empty_rocksdb_returns_early() {
556        let temp_dir = TempDir::new().unwrap();
557        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
558
559        let factory = create_test_provider_factory();
560        factory.set_storage_settings_cache(StorageSettings::v2());
561
562        // No checkpoints set — all default to 0 via unwrap_or(0).
563        // RocksDB tables are empty.
564        let provider = factory.database_provider_ro().unwrap();
565
566        let result = rocksdb.heal_transaction_hash_numbers(&provider).unwrap();
567        assert_eq!(result, None, "TransactionHashNumbers should return early at checkpoint 0");
568        assert!(rocksdb.first::<tables::TransactionHashNumbers>().unwrap().is_none());
569
570        let result = rocksdb.heal_storages_history(&provider).unwrap();
571        assert_eq!(result, None, "StoragesHistory should return early at checkpoint 0");
572
573        let result = rocksdb.heal_accounts_history(&provider).unwrap();
574        assert_eq!(result, None, "AccountsHistory should return early at checkpoint 0");
575        assert_eq!(rocksdb.iter::<tables::AccountsHistory>().unwrap().count(), 0);
576    }
577
578    #[test]
579    fn test_check_consistency_empty_rocksdb_with_checkpoint_is_first_run() {
580        let temp_dir = TempDir::new().unwrap();
581        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
582
583        // Create a test provider factory for MDBX
584        let factory = create_test_provider_factory();
585        factory.set_storage_settings_cache(StorageSettings::v2());
586
587        // Set a checkpoint indicating we should have processed up to block 100
588        {
589            let provider = factory.database_provider_rw().unwrap();
590            provider
591                .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(100))
592                .unwrap();
593            provider.commit().unwrap();
594        }
595
596        let provider = factory.database_provider_ro().unwrap();
597
598        // RocksDB is empty but checkpoint says block 100 was processed.
599        // Since static file tip defaults to 0 when None, and 0 < 100, an unwind is triggered.
600        let result = rocksdb.check_consistency(&provider).unwrap();
601        assert_eq!(result, Some(0), "Static file tip (0) behind checkpoint (100) triggers unwind");
602    }
603
604    /// Tests that when checkpoint=0 and `RocksDB` has data, all entries are pruned.
605    /// This simulates a crash recovery scenario where the checkpoint was lost.
606    #[test]
607    fn test_check_consistency_checkpoint_zero_with_rocksdb_data_prunes_all() {
608        let temp_dir = TempDir::new().unwrap();
609        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
610
611        let factory = create_test_provider_factory();
612        factory.set_storage_settings_cache(StorageSettings::v2());
613
614        // Generate blocks with real transactions and insert them
615        let mut rng = generators::rng();
616        let blocks = generators::random_block_range(
617            &mut rng,
618            0..=2,
619            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
620        );
621
622        let mut tx_hashes = Vec::new();
623        {
624            let provider = factory.database_provider_rw().unwrap();
625            let mut tx_count = 0u64;
626            for block in &blocks {
627                provider
628                    .insert_block(&block.clone().try_recover().expect("recover block"))
629                    .unwrap();
630                for tx in &block.body().transactions {
631                    let hash = *tx.tx_hash();
632                    tx_hashes.push(hash);
633                    rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
634                    tx_count += 1;
635                }
636            }
637            provider.commit().unwrap();
638        }
639
640        // Explicitly clear the checkpoints to simulate crash recovery
641        {
642            let provider = factory.database_provider_rw().unwrap();
643            provider
644                .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(0))
645                .unwrap();
646            provider
647                .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(0))
648                .unwrap();
649            provider
650                .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(0))
651                .unwrap();
652            provider.commit().unwrap();
653        }
654
655        // Verify RocksDB data exists
656        assert!(rocksdb.last::<tables::TransactionHashNumbers>().unwrap().is_some());
657
658        let provider = factory.database_provider_ro().unwrap();
659
660        // checkpoint = 0 but RocksDB has data.
661        // This means RocksDB has stale data that should be cleared.
662        let result = rocksdb.check_consistency(&provider).unwrap();
663        assert_eq!(result, None, "Should heal by clearing, no unwind needed");
664
665        // Verify data was cleared
666        for hash in &tx_hashes {
667            assert!(
668                rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap().is_none(),
669                "RocksDB should be empty after pruning"
670            );
671        }
672    }
673
674    #[test]
675    fn test_check_consistency_storages_history_empty_with_checkpoint_is_first_run() {
676        let temp_dir = TempDir::new().unwrap();
677        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
678
679        // Create a test provider factory for MDBX
680        let factory = create_test_provider_factory();
681        factory.set_storage_settings_cache(StorageSettings::v2());
682
683        // Set a checkpoint indicating we should have processed up to block 100
684        {
685            let provider = factory.database_provider_rw().unwrap();
686            provider
687                .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
688                .unwrap();
689            provider.commit().unwrap();
690        }
691
692        let provider = factory.database_provider_ro().unwrap();
693
694        // RocksDB is empty but checkpoint says block 100 was processed.
695        // Since sf_tip=0 < checkpoint=100, we return unwind target of 0.
696        // This should never happen in normal operation.
697        let result = rocksdb.check_consistency(&provider).unwrap();
698        assert_eq!(result, Some(0), "sf_tip=0 < checkpoint=100 returns unwind target");
699    }
700
701    #[test]
702    fn test_check_consistency_storages_history_preserves_genesis_entries_at_checkpoint_zero(
703    ) -> eyre::Result<()> {
704        // Modify mainnet chainspec to include a single genesis storage slot
705        let mut chain_spec = MAINNET.clone();
706        Arc::make_mut(&mut chain_spec).genesis.alloc.first_entry().unwrap().get_mut().storage =
707            Some(From::from([(B256::random(), B256::random())]));
708
709        // Create a test provider factory for MDBX with NO checkpoint
710        let factory = create_test_provider_factory_with_chain_spec(chain_spec);
711        let rocksdb = factory.rocksdb_provider();
712        factory.set_storage_settings_cache(StorageSettings::v2());
713
714        // Insert genesis history into RocksDB
715        let provider_rw = factory.database_provider_rw().unwrap();
716        insert_genesis_history(&provider_rw, factory.chain_spec().genesis.alloc.iter())?;
717        provider_rw.commit()?;
718
719        let provider = factory.database_provider_ro().unwrap();
720
721        // This should not prune anything because only genesis entries are present
722        let result = rocksdb.heal_storages_history(&provider).unwrap();
723        assert_eq!(result, None, "Should skip healing when only genesis entries present");
724
725        // Verify data was NOT deleted
726        assert!(
727            rocksdb.iter::<tables::StoragesHistory>().unwrap().count() > 0,
728            "Genesis entries should be preserved"
729        );
730
731        Ok(())
732    }
733
734    #[test]
735    fn test_check_consistency_mdbx_behind_checkpoint_needs_unwind() {
736        let temp_dir = TempDir::new().unwrap();
737        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
738
739        let factory = create_test_provider_factory();
740        factory.set_storage_settings_cache(StorageSettings::v2());
741
742        // Generate blocks with real transactions (blocks 0-2, 6 transactions total)
743        let mut rng = generators::rng();
744        let blocks = generators::random_block_range(
745            &mut rng,
746            0..=2,
747            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
748        );
749
750        {
751            let provider = factory.database_provider_rw().unwrap();
752            let mut tx_count = 0u64;
753            for block in &blocks {
754                provider
755                    .insert_block(&block.clone().try_recover().expect("recover block"))
756                    .unwrap();
757                for tx in &block.body().transactions {
758                    let hash = *tx.tx_hash();
759                    rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
760                    tx_count += 1;
761                }
762            }
763            provider.commit().unwrap();
764        }
765
766        // Set checkpoint to block 10 (beyond our actual data at block 2)
767        // sf_tip is at block 2, checkpoint is at block 10
768        // Since sf_tip < checkpoint, we need to unwind to sf_tip
769        {
770            let provider = factory.database_provider_rw().unwrap();
771            provider
772                .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(10))
773                .unwrap();
774            // Reset history checkpoints so they don't interfere
775            provider
776                .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(0))
777                .unwrap();
778            provider
779                .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(0))
780                .unwrap();
781            provider.commit().unwrap();
782        }
783
784        let provider = factory.database_provider_ro().unwrap();
785
786        // sf_tip (2) < checkpoint (10), so unwind to sf_tip is needed
787        let result = rocksdb.check_consistency(&provider).unwrap();
788        assert_eq!(result, Some(2), "sf_tip < checkpoint requires unwind to sf_tip");
789    }
790
791    #[test]
792    fn test_check_consistency_rocksdb_ahead_of_checkpoint_prunes_excess() {
793        let temp_dir = TempDir::new().unwrap();
794        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
795
796        // Create a test provider factory for MDBX
797        let factory = create_test_provider_factory();
798        factory.set_storage_settings_cache(StorageSettings::v2());
799
800        // Generate blocks with real transactions:
801        // Blocks 0-5, each with 2 transactions = 12 total transactions (0-11)
802        let mut rng = generators::rng();
803        let blocks = generators::random_block_range(
804            &mut rng,
805            0..=5,
806            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
807        );
808
809        // Track which hashes belong to which blocks
810        let mut tx_hashes = Vec::new();
811        let mut tx_count = 0u64;
812        {
813            let provider = factory.database_provider_rw().unwrap();
814            // Insert ALL blocks (0-5) to write transactions to static files
815            for block in &blocks {
816                provider
817                    .insert_block(&block.clone().try_recover().expect("recover block"))
818                    .unwrap();
819                for tx in &block.body().transactions {
820                    let hash = *tx.tx_hash();
821                    tx_hashes.push(hash);
822                    rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
823                    tx_count += 1;
824                }
825            }
826            provider.commit().unwrap();
827        }
828
829        // Simulate crash recovery scenario:
830        // MDBX was unwound to block 2, but RocksDB and static files still have more data.
831        // Remove TransactionBlocks entries for blocks 3-5 to simulate MDBX unwind.
832        {
833            let provider = factory.database_provider_rw().unwrap();
834            // Delete TransactionBlocks entries for tx > 5 (i.e., for blocks 3-5)
835            // TransactionBlocks maps last_tx_in_block -> block_number
836            // After unwind, only entries for blocks 0-2 should remain (tx 5 -> block 2)
837            let mut cursor = provider.tx_ref().cursor_write::<tables::TransactionBlocks>().unwrap();
838            // Walk and delete entries where block > 2
839            let mut to_delete = Vec::new();
840            let mut walker = cursor.walk(Some(0)).unwrap();
841            while let Some((tx_num, block_num)) = walker.next().transpose().unwrap() {
842                if block_num > 2 {
843                    to_delete.push(tx_num);
844                }
845            }
846            drop(walker);
847            for tx_num in to_delete {
848                cursor.seek_exact(tx_num).unwrap();
849                cursor.delete_current().unwrap();
850            }
851
852            // Set checkpoint to block 2
853            provider
854                .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(2))
855                .unwrap();
856            // Reset history checkpoints so they don't interfere
857            provider
858                .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(0))
859                .unwrap();
860            provider
861                .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(0))
862                .unwrap();
863            provider.commit().unwrap();
864        }
865
866        let provider = factory.database_provider_ro().unwrap();
867
868        // RocksDB has tx hashes for all blocks (0-5)
869        // MDBX TransactionBlocks only goes up to tx 5 (block 2)
870        // Static files have data for all txs (0-11)
871        // This means RocksDB is ahead and should prune entries for tx 6-11
872        let result = rocksdb.check_consistency(&provider).unwrap();
873        assert_eq!(result, None, "Should heal by pruning, no unwind needed");
874
875        // Verify: hashes for blocks 0-2 (tx 0-5) should remain, blocks 3-5 (tx 6-11) should be
876        // pruned First 6 hashes should remain
877        for (i, hash) in tx_hashes.iter().take(6).enumerate() {
878            assert!(
879                rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap().is_some(),
880                "tx {} should remain",
881                i
882            );
883        }
884        // Last 6 hashes should be pruned
885        for (i, hash) in tx_hashes.iter().skip(6).enumerate() {
886            assert!(
887                rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap().is_none(),
888                "tx {} should be pruned",
889                i + 6
890            );
891        }
892    }
893
894    #[test]
895    fn test_check_consistency_storages_history_sentinel_only_with_checkpoint_is_first_run() {
896        let temp_dir = TempDir::new().unwrap();
897        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
898
899        // Insert ONLY sentinel entries (highest_block_number = u64::MAX)
900        // This simulates a scenario where history tracking started but no shards were completed
901        let key_sentinel_1 = StorageShardedKey::new(Address::ZERO, B256::ZERO, u64::MAX);
902        let key_sentinel_2 = StorageShardedKey::new(Address::random(), B256::random(), u64::MAX);
903        let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
904        rocksdb.put::<tables::StoragesHistory>(key_sentinel_1, &block_list).unwrap();
905        rocksdb.put::<tables::StoragesHistory>(key_sentinel_2, &block_list).unwrap();
906
907        // Verify entries exist (not empty table)
908        assert!(rocksdb.first::<tables::StoragesHistory>().unwrap().is_some());
909
910        // Create a test provider factory for MDBX
911        let factory = create_test_provider_factory();
912        factory.set_storage_settings_cache(StorageSettings::v2());
913
914        // Set a checkpoint indicating we should have processed up to block 100
915        {
916            let provider = factory.database_provider_rw().unwrap();
917            provider
918                .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
919                .unwrap();
920            provider.commit().unwrap();
921        }
922
923        let provider = factory.database_provider_ro().unwrap();
924
925        // RocksDB has only sentinel entries but checkpoint is set.
926        // Since sf_tip=0 < checkpoint=100, we return unwind target of 0.
927        // This should never happen in normal operation.
928        let result = rocksdb.check_consistency(&provider).unwrap();
929        assert_eq!(result, Some(0), "sf_tip=0 < checkpoint=100 returns unwind target");
930    }
931
932    #[test]
933    fn test_check_consistency_accounts_history_sentinel_only_with_checkpoint_is_first_run() {
934        use reth_db_api::models::ShardedKey;
935
936        let temp_dir = TempDir::new().unwrap();
937        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
938
939        // Insert ONLY sentinel entries (highest_block_number = u64::MAX)
940        let key_sentinel_1 = ShardedKey::new(Address::ZERO, u64::MAX);
941        let key_sentinel_2 = ShardedKey::new(Address::random(), u64::MAX);
942        let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
943        rocksdb.put::<tables::AccountsHistory>(key_sentinel_1, &block_list).unwrap();
944        rocksdb.put::<tables::AccountsHistory>(key_sentinel_2, &block_list).unwrap();
945
946        // Verify entries exist (not empty table)
947        assert!(rocksdb.first::<tables::AccountsHistory>().unwrap().is_some());
948
949        // Create a test provider factory for MDBX
950        let factory = create_test_provider_factory();
951        factory.set_storage_settings_cache(StorageSettings::v2());
952
953        // Set a checkpoint indicating we should have processed up to block 100
954        {
955            let provider = factory.database_provider_rw().unwrap();
956            provider
957                .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
958                .unwrap();
959            provider.commit().unwrap();
960        }
961
962        let provider = factory.database_provider_ro().unwrap();
963
964        // RocksDB has only sentinel entries but checkpoint is set.
965        // Since sf_tip=0 < checkpoint=100, we return unwind target of 0.
966        // This should never happen in normal operation.
967        let result = rocksdb.check_consistency(&provider).unwrap();
968        assert_eq!(result, Some(0), "sf_tip=0 < checkpoint=100 returns unwind target");
969    }
970
971    /// Test that pruning works by fetching transactions and reading their hashes,
972    /// rather than iterating all rows. This test uses random blocks with unique
973    /// transactions so we can verify the correct entries are pruned.
974    #[test]
975    fn test_prune_transaction_hash_numbers_by_range() {
976        let temp_dir = TempDir::new().unwrap();
977        let rocksdb = RocksDBBuilder::new(temp_dir.path())
978            .with_table::<tables::TransactionHashNumbers>()
979            .build()
980            .unwrap();
981
982        // Create a test provider factory for MDBX
983        let factory = create_test_provider_factory();
984        factory.set_storage_settings_cache(StorageSettings::v2());
985
986        // Generate random blocks with unique transactions
987        // Block 0 (genesis) has no transactions
988        // Blocks 1-5 each have 2 transactions = 10 transactions total
989        let mut rng = generators::rng();
990        let blocks = generators::random_block_range(
991            &mut rng,
992            0..=5,
993            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
994        );
995
996        // Insert blocks into the database
997        let mut tx_count = 0u64;
998        let mut tx_hashes = Vec::new();
999        {
1000            let provider = factory.database_provider_rw().unwrap();
1001
1002            for block in &blocks {
1003                provider
1004                    .insert_block(&block.clone().try_recover().expect("recover block"))
1005                    .unwrap();
1006
1007                // Store transaction hash -> tx_number mappings in RocksDB
1008                for tx in &block.body().transactions {
1009                    let hash = *tx.tx_hash();
1010                    tx_hashes.push(hash);
1011                    rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
1012                    tx_count += 1;
1013                }
1014            }
1015
1016            // Set checkpoint to block 2 (meaning we should only have tx hashes for blocks 0-2)
1017            // Blocks 0, 1, 2 have 6 transactions (2 each), so tx 0-5 should remain
1018            provider
1019                .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(2))
1020                .unwrap();
1021            provider.commit().unwrap();
1022        }
1023
1024        // At this point:
1025        // - RocksDB has tx hashes for blocks 0-5 (10 total: 2 per block)
1026        // - Checkpoint says we only processed up to block 2
1027        // - We need to prune tx hashes for blocks 3, 4, 5 (tx 6-9)
1028
1029        // Verify RocksDB has the expected number of entries before pruning
1030        let rocksdb_count_before: usize =
1031            rocksdb.iter::<tables::TransactionHashNumbers>().unwrap().count();
1032        assert_eq!(
1033            rocksdb_count_before, tx_count as usize,
1034            "RocksDB should have all {} transaction hashes before pruning",
1035            tx_count
1036        );
1037
1038        let provider = factory.database_provider_ro().unwrap();
1039
1040        // Verify we can fetch transactions by tx range
1041        let all_txs = provider.transactions_by_tx_range(0..tx_count).unwrap();
1042        assert_eq!(all_txs.len(), tx_count as usize, "Should be able to fetch all transactions");
1043
1044        // Verify the hashes match between what we stored and what we read from fetched txs.
1045        for (i, tx) in all_txs.iter().enumerate() {
1046            let fetched_hash = *tx.tx_hash();
1047            assert_eq!(
1048                fetched_hash, tx_hashes[i],
1049                "Hash mismatch for tx {}: stored {:?} vs fetched {:?}",
1050                i, tx_hashes[i], fetched_hash
1051            );
1052        }
1053
1054        // Blocks 0, 1, 2 have 2 tx each = 6 tx total (indices 0-5)
1055        // We want to keep tx 0-5, prune tx 6-9
1056        let max_tx_to_keep = 5u64;
1057        let tx_to_prune_start = max_tx_to_keep + 1;
1058
1059        // Prune transactions 6-9 (blocks 3-5)
1060        rocksdb
1061            .prune_transaction_hash_numbers_in_range(&provider, tx_to_prune_start..=(tx_count - 1))
1062            .expect("prune should succeed");
1063
1064        // Verify: transactions 0-5 should remain, 6-9 should be pruned
1065        let mut remaining_count = 0;
1066        for result in rocksdb.iter::<tables::TransactionHashNumbers>().unwrap() {
1067            let (_hash, tx_num) = result.unwrap();
1068            assert!(
1069                tx_num <= max_tx_to_keep,
1070                "Transaction {} should have been pruned (> {})",
1071                tx_num,
1072                max_tx_to_keep
1073            );
1074            remaining_count += 1;
1075        }
1076        assert_eq!(
1077            remaining_count,
1078            (max_tx_to_keep + 1) as usize,
1079            "Should have {} transactions (0-{})",
1080            max_tx_to_keep + 1,
1081            max_tx_to_keep
1082        );
1083    }
1084
1085    #[test]
1086    fn test_check_consistency_accounts_history_empty_with_checkpoint_is_first_run() {
1087        let temp_dir = TempDir::new().unwrap();
1088        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1089
1090        // Create a test provider factory for MDBX
1091        let factory = create_test_provider_factory();
1092        factory.set_storage_settings_cache(StorageSettings::v2());
1093
1094        // Set a checkpoint indicating we should have processed up to block 100
1095        {
1096            let provider = factory.database_provider_rw().unwrap();
1097            provider
1098                .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
1099                .unwrap();
1100            provider.commit().unwrap();
1101        }
1102
1103        let provider = factory.database_provider_ro().unwrap();
1104
1105        // RocksDB is empty but checkpoint says block 100 was processed.
1106        // Since sf_tip=0 < checkpoint=100, we return unwind target of 0.
1107        // This should never happen in normal operation.
1108        let result = rocksdb.check_consistency(&provider).unwrap();
1109        assert_eq!(result, Some(0), "sf_tip=0 < checkpoint=100 returns unwind target");
1110    }
1111
1112    #[test]
1113    fn test_check_consistency_accounts_history_preserves_genesis_entries_at_checkpoint_zero(
1114    ) -> eyre::Result<()> {
1115        // Create a test provider factory for MDBX with NO checkpoint
1116        let factory = create_test_provider_factory();
1117        factory.set_storage_settings_cache(StorageSettings::v2());
1118        let rocksdb = factory.rocksdb_provider();
1119
1120        // Insert genesis history into RocksDB
1121        let provider_rw = factory.database_provider_rw().unwrap();
1122        insert_genesis_history(&provider_rw, factory.chain_spec().genesis.alloc.iter())?;
1123        provider_rw.commit()?;
1124
1125        let provider = factory.database_provider_ro().unwrap();
1126
1127        // This should not prune anything because only genesis entries are present
1128        let result = rocksdb.check_consistency(&provider).unwrap();
1129        assert_eq!(result, None, "Should heal by pruning, no unwind needed");
1130
1131        // Verify data was NOT deleted
1132        assert!(
1133            rocksdb.iter::<tables::AccountsHistory>().unwrap().count() > 0,
1134            "Genesis entries should be preserved"
1135        );
1136
1137        Ok(())
1138    }
1139
1140    #[test]
1141    fn test_check_consistency_accounts_history_sf_tip_equals_checkpoint_no_action() {
1142        use reth_db::models::AccountBeforeTx;
1143        use reth_db_api::models::ShardedKey;
1144        use reth_static_file_types::StaticFileSegment;
1145
1146        let temp_dir = TempDir::new().unwrap();
1147        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1148
1149        // Insert some AccountsHistory entries with various highest_block_numbers
1150        let key1 = ShardedKey::new(Address::ZERO, 50);
1151        let key2 = ShardedKey::new(Address::random(), 75);
1152        let key3 = ShardedKey::new(Address::random(), u64::MAX); // sentinel
1153        let block_list1 = BlockNumberList::new_pre_sorted([10, 20, 30, 50]);
1154        let block_list2 = BlockNumberList::new_pre_sorted([40, 60, 75]);
1155        let block_list3 = BlockNumberList::new_pre_sorted([80, 90, 100]);
1156        rocksdb.put::<tables::AccountsHistory>(key1, &block_list1).unwrap();
1157        rocksdb.put::<tables::AccountsHistory>(key2, &block_list2).unwrap();
1158        rocksdb.put::<tables::AccountsHistory>(key3, &block_list3).unwrap();
1159
1160        // Capture RocksDB state before consistency check
1161        let entries_before: Vec<_> =
1162            rocksdb.iter::<tables::AccountsHistory>().unwrap().map(|r| r.unwrap()).collect();
1163        assert_eq!(entries_before.len(), 3, "Should have 3 entries before check");
1164
1165        // Create a test provider factory for MDBX
1166        let factory = create_test_provider_factory();
1167        factory.set_storage_settings_cache(StorageSettings::v2());
1168
1169        // Write account changesets to static files for blocks 0-100
1170        {
1171            let sf_provider = factory.static_file_provider();
1172            let mut writer =
1173                sf_provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
1174
1175            for block_num in 0..=100 {
1176                let changeset = vec![AccountBeforeTx { address: Address::random(), info: None }];
1177                writer.append_account_changeset(changeset, block_num).unwrap();
1178            }
1179
1180            writer.commit().unwrap();
1181        }
1182
1183        // Set IndexAccountHistory checkpoint to block 100 (same as sf_tip)
1184        {
1185            let provider = factory.database_provider_rw().unwrap();
1186            provider
1187                .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
1188                .unwrap();
1189            provider.commit().unwrap();
1190        }
1191
1192        let provider = factory.database_provider_ro().unwrap();
1193
1194        // Verify sf_tip equals checkpoint (both at 100)
1195        let sf_tip = provider
1196            .static_file_provider()
1197            .get_highest_static_file_block(StaticFileSegment::AccountChangeSets)
1198            .unwrap();
1199        assert_eq!(sf_tip, 100, "Static file tip should be 100");
1200
1201        // Run check_consistency - should return None (no unwind needed)
1202        let result = rocksdb.check_consistency(&provider).unwrap();
1203        assert_eq!(result, None, "sf_tip == checkpoint should not require unwind");
1204
1205        // Verify NO entries are deleted - RocksDB state unchanged
1206        let entries_after: Vec<_> =
1207            rocksdb.iter::<tables::AccountsHistory>().unwrap().map(|r| r.unwrap()).collect();
1208
1209        assert_eq!(
1210            entries_after.len(),
1211            entries_before.len(),
1212            "RocksDB entry count should be unchanged when sf_tip == checkpoint"
1213        );
1214
1215        // Verify exact entries are preserved
1216        for (before, after) in entries_before.iter().zip(entries_after.iter()) {
1217            assert_eq!(before.0.key, after.0.key, "Entry key should be unchanged");
1218            assert_eq!(
1219                before.0.highest_block_number, after.0.highest_block_number,
1220                "Entry highest_block_number should be unchanged"
1221            );
1222            assert_eq!(before.1, after.1, "Entry block list should be unchanged");
1223        }
1224    }
1225
1226    /// Tests `StoragesHistory` changeset-based healing with enough blocks to trigger batching.
1227    ///
1228    /// Scenario:
1229    /// 1. Generate 15,000 blocks worth of storage changeset data (to exceed the 10k batch size)
1230    /// 2. Each block has 1 storage change (address + slot + value)
1231    /// 3. Write storage changesets to static files for all 15k blocks
1232    /// 4. Set `IndexStorageHistory` checkpoint to block 5000
1233    /// 5. Insert stale `StoragesHistory` entries in `RocksDB` for (address, slot) pairs that
1234    ///    changed in blocks 5001-15000
1235    /// 6. Run `check_consistency`
1236    /// 7. Verify stale entries for blocks > 5000 are pruned and batching worked
1237    #[test]
1238    fn test_check_consistency_storages_history_heals_via_changesets_large_range() {
1239        use alloy_primitives::U256;
1240        use reth_db_api::models::StorageBeforeTx;
1241
1242        const TOTAL_BLOCKS: u64 = 15_000;
1243        const CHECKPOINT_BLOCK: u64 = 5_000;
1244
1245        let temp_dir = TempDir::new().unwrap();
1246        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1247
1248        let factory = create_test_provider_factory();
1249        factory.set_storage_settings_cache(StorageSettings::v2());
1250
1251        // Helper to generate address from block number (reuses stack arrays)
1252        #[inline]
1253        fn make_address(block_num: u64) -> Address {
1254            let mut addr_bytes = [0u8; 20];
1255            addr_bytes[0..8].copy_from_slice(&block_num.to_le_bytes());
1256            Address::from(addr_bytes)
1257        }
1258
1259        // Helper to generate slot from block number (reuses stack arrays)
1260        #[inline]
1261        fn make_slot(block_num: u64) -> B256 {
1262            let mut slot_bytes = [0u8; 32];
1263            slot_bytes[0..8].copy_from_slice(&block_num.to_le_bytes());
1264            B256::from(slot_bytes)
1265        }
1266
1267        // Write storage changesets to static files for 15k blocks.
1268        // Each block has 1 storage change with a unique (address, slot) pair.
1269        {
1270            let sf_provider = factory.static_file_provider();
1271            let mut writer =
1272                sf_provider.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
1273
1274            // Reuse changeset vec to avoid repeated allocations
1275            let mut changeset = Vec::with_capacity(1);
1276
1277            for block_num in 0..TOTAL_BLOCKS {
1278                changeset.clear();
1279                changeset.push(StorageBeforeTx {
1280                    address: make_address(block_num),
1281                    key: make_slot(block_num),
1282                    value: U256::from(block_num),
1283                });
1284
1285                writer.append_storage_changeset(changeset.clone(), block_num).unwrap();
1286            }
1287
1288            writer.commit().unwrap();
1289        }
1290
1291        // Verify static files have data up to block 14999
1292        {
1293            let sf_provider = factory.static_file_provider();
1294            let highest = sf_provider
1295                .get_highest_static_file_block(StaticFileSegment::StorageChangeSets)
1296                .unwrap();
1297            assert_eq!(highest, TOTAL_BLOCKS - 1, "Static files should have blocks 0..14999");
1298        }
1299
1300        // Set IndexStorageHistory checkpoint to block 5000
1301        {
1302            let provider = factory.database_provider_rw().unwrap();
1303            provider
1304                .save_stage_checkpoint(
1305                    StageId::IndexStorageHistory,
1306                    StageCheckpoint::new(CHECKPOINT_BLOCK),
1307                )
1308                .unwrap();
1309            provider.commit().unwrap();
1310        }
1311
1312        // Insert stale StoragesHistory entries for blocks 5001-14999
1313        // These are (address, slot) pairs that changed after the checkpoint
1314        for block_num in (CHECKPOINT_BLOCK + 1)..TOTAL_BLOCKS {
1315            let key =
1316                StorageShardedKey::new(make_address(block_num), make_slot(block_num), block_num);
1317            let block_list = BlockNumberList::new_pre_sorted([block_num]);
1318            rocksdb.put::<tables::StoragesHistory>(key, &block_list).unwrap();
1319        }
1320
1321        // Verify RocksDB has stale entries before healing
1322        let count_before: usize = rocksdb.iter::<tables::StoragesHistory>().unwrap().count();
1323        assert_eq!(
1324            count_before,
1325            (TOTAL_BLOCKS - CHECKPOINT_BLOCK - 1) as usize,
1326            "Should have {} stale entries before healing",
1327            TOTAL_BLOCKS - CHECKPOINT_BLOCK - 1
1328        );
1329
1330        // Run check_consistency - this should heal by pruning stale entries
1331        let provider = factory.database_provider_ro().unwrap();
1332        let result = rocksdb.check_consistency(&provider).unwrap();
1333        assert_eq!(result, None, "Should heal via changesets, no unwind needed");
1334
1335        // Verify all stale entries were pruned
1336        // After healing, entries with highest_block_number > checkpoint should be gone
1337        let mut remaining_stale = 0;
1338        for result in rocksdb.iter::<tables::StoragesHistory>().unwrap() {
1339            let (key, _) = result.unwrap();
1340            if key.sharded_key.highest_block_number > CHECKPOINT_BLOCK {
1341                remaining_stale += 1;
1342            }
1343        }
1344        assert_eq!(
1345            remaining_stale, 0,
1346            "All stale entries (block > {}) should be pruned",
1347            CHECKPOINT_BLOCK
1348        );
1349    }
1350
1351    /// Tests that healing preserves entries at exactly the checkpoint block.
1352    ///
1353    /// This catches off-by-one bugs where checkpoint block data is incorrectly deleted.
1354    #[test]
1355    fn test_check_consistency_storages_history_preserves_checkpoint_block() {
1356        use alloy_primitives::U256;
1357        use reth_db_api::models::StorageBeforeTx;
1358
1359        const CHECKPOINT_BLOCK: u64 = 100;
1360        const SF_TIP: u64 = 200;
1361
1362        let temp_dir = TempDir::new().unwrap();
1363        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1364
1365        let factory = create_test_provider_factory();
1366        factory.set_storage_settings_cache(StorageSettings::v2());
1367
1368        let checkpoint_addr = Address::repeat_byte(0xAA);
1369        let checkpoint_slot = B256::repeat_byte(0xBB);
1370        let stale_addr = Address::repeat_byte(0xCC);
1371        let stale_slot = B256::repeat_byte(0xDD);
1372
1373        // Write storage changesets to static files
1374        {
1375            let sf_provider = factory.static_file_provider();
1376            let mut writer =
1377                sf_provider.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
1378
1379            for block_num in 0..=SF_TIP {
1380                let changeset = if block_num == CHECKPOINT_BLOCK {
1381                    vec![StorageBeforeTx {
1382                        address: checkpoint_addr,
1383                        key: checkpoint_slot,
1384                        value: U256::from(block_num),
1385                    }]
1386                } else if block_num > CHECKPOINT_BLOCK {
1387                    vec![StorageBeforeTx {
1388                        address: stale_addr,
1389                        key: stale_slot,
1390                        value: U256::from(block_num),
1391                    }]
1392                } else {
1393                    vec![StorageBeforeTx {
1394                        address: Address::ZERO,
1395                        key: B256::ZERO,
1396                        value: U256::ZERO,
1397                    }]
1398                };
1399                writer.append_storage_changeset(changeset, block_num).unwrap();
1400            }
1401            writer.commit().unwrap();
1402        }
1403
1404        // Set checkpoint
1405        {
1406            let provider = factory.database_provider_rw().unwrap();
1407            provider
1408                .save_stage_checkpoint(
1409                    StageId::IndexStorageHistory,
1410                    StageCheckpoint::new(CHECKPOINT_BLOCK),
1411                )
1412                .unwrap();
1413            provider.commit().unwrap();
1414        }
1415
1416        // Insert entry AT the checkpoint block (should be preserved)
1417        let checkpoint_key =
1418            StorageShardedKey::new(checkpoint_addr, checkpoint_slot, CHECKPOINT_BLOCK);
1419        let checkpoint_list = BlockNumberList::new_pre_sorted([CHECKPOINT_BLOCK]);
1420        rocksdb.put::<tables::StoragesHistory>(checkpoint_key.clone(), &checkpoint_list).unwrap();
1421
1422        // Insert stale entry AFTER the checkpoint (should be removed)
1423        let stale_key = StorageShardedKey::new(stale_addr, stale_slot, SF_TIP);
1424        let stale_list = BlockNumberList::new_pre_sorted([CHECKPOINT_BLOCK + 1, SF_TIP]);
1425        rocksdb.put::<tables::StoragesHistory>(stale_key.clone(), &stale_list).unwrap();
1426
1427        // Run healing
1428        let provider = factory.database_provider_ro().unwrap();
1429        let result = rocksdb.check_consistency(&provider).unwrap();
1430        assert_eq!(result, None, "Should heal without unwind");
1431
1432        // Verify checkpoint block entry is PRESERVED
1433        let preserved = rocksdb.get::<tables::StoragesHistory>(checkpoint_key).unwrap();
1434        assert!(preserved.is_some(), "Entry at checkpoint block should be preserved, not deleted");
1435
1436        // Verify stale entry is removed or unwound
1437        let stale = rocksdb.get::<tables::StoragesHistory>(stale_key).unwrap();
1438        assert!(stale.is_none(), "Stale entry after checkpoint should be removed");
1439    }
1440
1441    /// Tests `AccountsHistory` changeset-based healing with enough blocks to trigger batching.
1442    ///
1443    /// Scenario:
1444    /// 1. Generate 15,000 blocks worth of account changeset data (to exceed the 10k batch size)
1445    /// 2. Each block has 1 account change (simple - just random addresses)
1446    /// 3. Write account changesets to static files for all 15k blocks
1447    /// 4. Set `IndexAccountHistory` checkpoint to block 5000
1448    /// 5. Insert stale `AccountsHistory` entries in `RocksDB` for addresses that changed in blocks
1449    ///    5001-15000
1450    /// 6. Run `check_consistency`
1451    /// 7. Verify:
1452    ///    - Stale entries for blocks > 5000 are pruned
1453    ///    - The batching worked (no OOM, completed successfully)
1454    #[test]
1455    fn test_check_consistency_accounts_history_heals_via_changesets_large_range() {
1456        use reth_db::models::AccountBeforeTx;
1457        use reth_db_api::models::ShardedKey;
1458        use reth_static_file_types::StaticFileSegment;
1459
1460        let temp_dir = TempDir::new().unwrap();
1461        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1462
1463        // Create test provider factory
1464        let factory = create_test_provider_factory();
1465        factory.set_storage_settings_cache(StorageSettings::v2());
1466
1467        const TOTAL_BLOCKS: u64 = 15_000;
1468        const CHECKPOINT_BLOCK: u64 = 5_000;
1469
1470        // Helper to generate address from block number (avoids pre-allocating 15k addresses)
1471        #[inline]
1472        fn make_address(block_num: u64) -> Address {
1473            let mut addr = Address::ZERO;
1474            addr.0[0..8].copy_from_slice(&block_num.to_le_bytes());
1475            addr
1476        }
1477
1478        // Write account changesets to static files for all 15k blocks
1479        {
1480            let sf_provider = factory.static_file_provider();
1481            let mut writer =
1482                sf_provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
1483
1484            // Reuse changeset vec to avoid repeated allocations
1485            let mut changeset = Vec::with_capacity(1);
1486
1487            for block_num in 0..TOTAL_BLOCKS {
1488                changeset.clear();
1489                changeset.push(AccountBeforeTx { address: make_address(block_num), info: None });
1490                writer.append_account_changeset(changeset.clone(), block_num).unwrap();
1491            }
1492
1493            writer.commit().unwrap();
1494        }
1495
1496        // Insert stale AccountsHistory entries in RocksDB for addresses that changed
1497        // in blocks 5001-15000 (i.e., blocks after the checkpoint)
1498        // These should be pruned by check_consistency
1499        for block_num in (CHECKPOINT_BLOCK + 1)..TOTAL_BLOCKS {
1500            let key = ShardedKey::new(make_address(block_num), block_num);
1501            let block_list = BlockNumberList::new_pre_sorted([block_num]);
1502            rocksdb.put::<tables::AccountsHistory>(key, &block_list).unwrap();
1503        }
1504
1505        // Also insert some valid entries for blocks <= 5000 that should NOT be pruned
1506        for block_num in [100u64, 500, 1000, 2500, 5000] {
1507            let key = ShardedKey::new(make_address(block_num), block_num);
1508            let block_list = BlockNumberList::new_pre_sorted([block_num]);
1509            rocksdb.put::<tables::AccountsHistory>(key, &block_list).unwrap();
1510        }
1511
1512        // Verify we have entries before healing
1513        let entries_before: usize = rocksdb.iter::<tables::AccountsHistory>().unwrap().count();
1514        let stale_count = (TOTAL_BLOCKS - CHECKPOINT_BLOCK - 1) as usize;
1515        let valid_count = 5usize;
1516        assert_eq!(
1517            entries_before,
1518            stale_count + valid_count,
1519            "Should have {} stale + {} valid entries before healing",
1520            stale_count,
1521            valid_count
1522        );
1523
1524        // Set IndexAccountHistory checkpoint to block 5000
1525        {
1526            let provider = factory.database_provider_rw().unwrap();
1527            provider
1528                .save_stage_checkpoint(
1529                    StageId::IndexAccountHistory,
1530                    StageCheckpoint::new(CHECKPOINT_BLOCK),
1531                )
1532                .unwrap();
1533            provider.commit().unwrap();
1534        }
1535
1536        let provider = factory.database_provider_ro().unwrap();
1537
1538        // Verify sf_tip > checkpoint
1539        let sf_tip = provider
1540            .static_file_provider()
1541            .get_highest_static_file_block(StaticFileSegment::AccountChangeSets)
1542            .unwrap();
1543        assert_eq!(sf_tip, TOTAL_BLOCKS - 1, "Static file tip should be 14999");
1544        assert!(sf_tip > CHECKPOINT_BLOCK, "sf_tip should be > checkpoint to trigger healing");
1545
1546        // Run check_consistency - this should trigger batched changeset-based healing
1547        let result = rocksdb.check_consistency(&provider).unwrap();
1548        assert_eq!(result, None, "Healing should succeed without requiring unwind");
1549
1550        // Verify: all stale entries for blocks > 5000 should be pruned
1551        // Count remaining entries with highest_block_number > checkpoint
1552        let mut remaining_stale = 0;
1553        for result in rocksdb.iter::<tables::AccountsHistory>().unwrap() {
1554            let (key, _) = result.unwrap();
1555            if key.highest_block_number > CHECKPOINT_BLOCK && key.highest_block_number != u64::MAX {
1556                remaining_stale += 1;
1557            }
1558        }
1559        assert_eq!(
1560            remaining_stale, 0,
1561            "All stale entries (block > {}) should be pruned",
1562            CHECKPOINT_BLOCK
1563        );
1564    }
1565
1566    /// Tests that accounts history healing preserves entries at exactly the checkpoint block.
1567    #[test]
1568    fn test_check_consistency_accounts_history_preserves_checkpoint_block() {
1569        use reth_db::models::AccountBeforeTx;
1570        use reth_db_api::models::ShardedKey;
1571
1572        const CHECKPOINT_BLOCK: u64 = 100;
1573        const SF_TIP: u64 = 200;
1574
1575        let temp_dir = TempDir::new().unwrap();
1576        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1577
1578        let factory = create_test_provider_factory();
1579        factory.set_storage_settings_cache(StorageSettings::v2());
1580
1581        let checkpoint_addr = Address::repeat_byte(0xAA);
1582        let stale_addr = Address::repeat_byte(0xCC);
1583
1584        // Write account changesets to static files
1585        {
1586            let sf_provider = factory.static_file_provider();
1587            let mut writer =
1588                sf_provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
1589
1590            for block_num in 0..=SF_TIP {
1591                let changeset = if block_num == CHECKPOINT_BLOCK {
1592                    vec![AccountBeforeTx { address: checkpoint_addr, info: None }]
1593                } else if block_num > CHECKPOINT_BLOCK {
1594                    vec![AccountBeforeTx { address: stale_addr, info: None }]
1595                } else {
1596                    vec![AccountBeforeTx { address: Address::ZERO, info: None }]
1597                };
1598                writer.append_account_changeset(changeset, block_num).unwrap();
1599            }
1600            writer.commit().unwrap();
1601        }
1602
1603        // Set checkpoint
1604        {
1605            let provider = factory.database_provider_rw().unwrap();
1606            provider
1607                .save_stage_checkpoint(
1608                    StageId::IndexAccountHistory,
1609                    StageCheckpoint::new(CHECKPOINT_BLOCK),
1610                )
1611                .unwrap();
1612            provider.commit().unwrap();
1613        }
1614
1615        // Insert entry AT the checkpoint block (should be preserved)
1616        let checkpoint_key = ShardedKey::new(checkpoint_addr, CHECKPOINT_BLOCK);
1617        let checkpoint_list = BlockNumberList::new_pre_sorted([CHECKPOINT_BLOCK]);
1618        rocksdb.put::<tables::AccountsHistory>(checkpoint_key.clone(), &checkpoint_list).unwrap();
1619
1620        // Insert stale entry AFTER the checkpoint (should be removed)
1621        let stale_key = ShardedKey::new(stale_addr, SF_TIP);
1622        let stale_list = BlockNumberList::new_pre_sorted([CHECKPOINT_BLOCK + 1, SF_TIP]);
1623        rocksdb.put::<tables::AccountsHistory>(stale_key.clone(), &stale_list).unwrap();
1624
1625        // Run healing
1626        let provider = factory.database_provider_ro().unwrap();
1627        let result = rocksdb.check_consistency(&provider).unwrap();
1628        assert_eq!(result, None, "Should heal without unwind");
1629
1630        // Verify checkpoint block entry is PRESERVED
1631        let preserved = rocksdb.get::<tables::AccountsHistory>(checkpoint_key).unwrap();
1632        assert!(preserved.is_some(), "Entry at checkpoint block should be preserved, not deleted");
1633
1634        // Verify stale entry is removed or unwound
1635        let stale = rocksdb.get::<tables::AccountsHistory>(stale_key).unwrap();
1636        assert!(stale.is_none(), "Stale entry after checkpoint should be removed");
1637    }
1638
1639    #[test]
1640    fn test_check_consistency_storages_history_sf_tip_equals_checkpoint_no_action() {
1641        use alloy_primitives::U256;
1642        use reth_db::models::StorageBeforeTx;
1643        use reth_static_file_types::StaticFileSegment;
1644
1645        let temp_dir = TempDir::new().unwrap();
1646        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1647
1648        // Insert StoragesHistory entries into RocksDB
1649        let key1 = StorageShardedKey::new(Address::ZERO, B256::ZERO, 50);
1650        let key2 = StorageShardedKey::new(Address::random(), B256::random(), 80);
1651        let block_list1 = BlockNumberList::new_pre_sorted([10, 20, 30, 50]);
1652        let block_list2 = BlockNumberList::new_pre_sorted([40, 60, 80]);
1653        rocksdb.put::<tables::StoragesHistory>(key1, &block_list1).unwrap();
1654        rocksdb.put::<tables::StoragesHistory>(key2, &block_list2).unwrap();
1655
1656        // Capture entries before consistency check
1657        let entries_before: Vec<_> =
1658            rocksdb.iter::<tables::StoragesHistory>().unwrap().map(|r| r.unwrap()).collect();
1659
1660        // Create a test provider factory
1661        let factory = create_test_provider_factory();
1662        factory.set_storage_settings_cache(StorageSettings::v2());
1663
1664        // Write storage changesets to static files for blocks 0-100
1665        {
1666            let sf_provider = factory.static_file_provider();
1667            let mut writer =
1668                sf_provider.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
1669
1670            for block_num in 0..=100u64 {
1671                let changeset = vec![StorageBeforeTx {
1672                    address: Address::ZERO,
1673                    key: B256::with_last_byte(block_num as u8),
1674                    value: U256::from(block_num),
1675                }];
1676                writer.append_storage_changeset(changeset, block_num).unwrap();
1677            }
1678            writer.commit().unwrap();
1679        }
1680
1681        // Set IndexStorageHistory checkpoint to block 100 (same as sf_tip)
1682        {
1683            let provider = factory.database_provider_rw().unwrap();
1684            provider
1685                .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
1686                .unwrap();
1687            provider.commit().unwrap();
1688        }
1689
1690        let provider = factory.database_provider_ro().unwrap();
1691
1692        // Verify sf_tip equals checkpoint (both at 100)
1693        let sf_tip = provider
1694            .static_file_provider()
1695            .get_highest_static_file_block(StaticFileSegment::StorageChangeSets)
1696            .unwrap();
1697        assert_eq!(sf_tip, 100, "Static file tip should be 100");
1698
1699        // Run check_consistency - should return None (no unwind needed)
1700        let result = rocksdb.check_consistency(&provider).unwrap();
1701        assert_eq!(result, None, "sf_tip == checkpoint should not require unwind");
1702
1703        // Verify NO entries are deleted - RocksDB state unchanged
1704        let entries_after: Vec<_> =
1705            rocksdb.iter::<tables::StoragesHistory>().unwrap().map(|r| r.unwrap()).collect();
1706
1707        assert_eq!(
1708            entries_after.len(),
1709            entries_before.len(),
1710            "RocksDB entry count should be unchanged when sf_tip == checkpoint"
1711        );
1712
1713        // Verify exact entries are preserved
1714        for (before, after) in entries_before.iter().zip(entries_after.iter()) {
1715            assert_eq!(before.0, after.0, "Entry key should be unchanged");
1716            assert_eq!(before.1, after.1, "Entry block list should be unchanged");
1717        }
1718    }
1719}