Skip to main content

reth_provider/providers/rocksdb/
invariants.rs

1//! Invariant checking for `RocksDB` tables.
2//!
3//! This module provides consistency checks for tables stored in `RocksDB`, similar to the
4//! consistency checks for static files. The goal is to detect and potentially heal
5//! inconsistencies between `RocksDB` data and MDBX checkpoints.
6
7use super::RocksDBProvider;
8use crate::StaticFileProviderFactory;
9use alloy_eips::eip2718::Encodable2718;
10use alloy_primitives::BlockNumber;
11use rayon::prelude::*;
12use reth_db_api::tables;
13use reth_stages_types::StageId;
14use reth_static_file_types::StaticFileSegment;
15use reth_storage_api::{
16    BlockBodyIndicesProvider, ChangeSetReader, DBProvider, StageCheckpointReader,
17    StorageChangeSetReader, StorageSettingsCache, TransactionsProvider,
18};
19use reth_storage_errors::provider::ProviderResult;
20use std::collections::HashSet;
21
22/// Batch size for changeset iteration during history healing.
23/// Balances memory usage against iteration overhead.
24const HEAL_HISTORY_BATCH_SIZE: u64 = 10_000;
25
26impl RocksDBProvider {
27    /// Checks consistency of `RocksDB` tables against MDBX stage checkpoints.
28    ///
29    /// Returns an unwind target block number if the pipeline needs to unwind to rebuild
30    /// `RocksDB` data. Returns `None` if all invariants pass or if inconsistencies were healed.
31    ///
32    /// # Invariants checked
33    ///
34    /// For `TransactionHashNumbers`:
35    /// - The maximum `TxNumber` value should not exceed what the `TransactionLookup` stage
36    ///   checkpoint indicates has been processed.
37    /// - If `RocksDB` is ahead, excess entries are pruned (healed).
38    /// - If `RocksDB` is behind, an unwind is required.
39    ///
40    /// For `StoragesHistory` and `AccountsHistory`:
41    /// - Uses changesets to heal stale entries when static file tip > checkpoint.
42    ///
43    /// # Requirements
44    ///
45    /// For pruning `TransactionHashNumbers`, the provider must be able to supply transaction
46    /// data (typically from static files) so that transaction hashes can be computed. This
47    /// implies that static files should be ahead of or in sync with `RocksDB`.
48    pub fn check_consistency<Provider>(
49        &self,
50        provider: &Provider,
51    ) -> ProviderResult<Option<BlockNumber>>
52    where
53        Provider: DBProvider
54            + StageCheckpointReader
55            + StorageSettingsCache
56            + StaticFileProviderFactory
57            + BlockBodyIndicesProvider
58            + StorageChangeSetReader
59            + ChangeSetReader
60            + TransactionsProvider<Transaction: Encodable2718>,
61    {
62        let mut unwind_target: Option<BlockNumber> = None;
63
64        // Heal TransactionHashNumbers if stored in RocksDB
65        if provider.cached_storage_settings().storage_v2 &&
66            let Some(target) = self.heal_transaction_hash_numbers(provider)?
67        {
68            unwind_target = Some(unwind_target.map_or(target, |t| t.min(target)));
69        }
70
71        // Heal StoragesHistory if stored in RocksDB
72        if provider.cached_storage_settings().storage_v2 &&
73            let Some(target) = self.heal_storages_history(provider)?
74        {
75            unwind_target = Some(unwind_target.map_or(target, |t| t.min(target)));
76        }
77
78        // Heal AccountsHistory if stored in RocksDB
79        if provider.cached_storage_settings().storage_v2 &&
80            let Some(target) = self.heal_accounts_history(provider)?
81        {
82            unwind_target = Some(unwind_target.map_or(target, |t| t.min(target)));
83        }
84
85        Ok(unwind_target)
86    }
87
88    /// Heals the `TransactionHashNumbers` table.
89    ///
90    /// - Fast path: if checkpoint == 0 AND `RocksDB` has data, clear everything
91    /// - If `sf_tip` < checkpoint, return unwind target (static files behind)
92    /// - If `sf_tip` == checkpoint, nothing to do
93    /// - If `sf_tip` > checkpoint, heal via transaction ranges in batches
94    fn heal_transaction_hash_numbers<Provider>(
95        &self,
96        provider: &Provider,
97    ) -> ProviderResult<Option<BlockNumber>>
98    where
99        Provider: DBProvider
100            + StageCheckpointReader
101            + StaticFileProviderFactory
102            + BlockBodyIndicesProvider
103            + TransactionsProvider<Transaction: Encodable2718>,
104    {
105        let checkpoint = provider
106            .get_stage_checkpoint(StageId::TransactionLookup)?
107            .map(|cp| cp.block_number)
108            .unwrap_or(0);
109
110        let sf_tip = provider
111            .static_file_provider()
112            .get_highest_static_file_block(StaticFileSegment::Transactions)
113            .unwrap_or(0);
114
115        // Fast path: if checkpoint is 0 and RocksDB has data, clear everything.
116        if checkpoint == 0 && self.first::<tables::TransactionHashNumbers>()?.is_some() {
117            tracing::info!(
118                target: "reth::providers::rocksdb",
119                "TransactionHashNumbers has data but checkpoint is 0, clearing all"
120            );
121            self.clear::<tables::TransactionHashNumbers>()?;
122            return Ok(None);
123        }
124
125        if sf_tip < checkpoint {
126            // This should never happen in normal operation - static files are always committed
127            // before RocksDB. If we get here, something is seriously wrong. The unwind is a
128            // best-effort attempt but is probably futile.
129            tracing::warn!(
130                target: "reth::providers::rocksdb",
131                sf_tip,
132                checkpoint,
133                "TransactionHashNumbers: static file tip behind checkpoint, unwind needed"
134            );
135            return Ok(Some(sf_tip));
136        }
137
138        // sf_tip == checkpoint - nothing to do
139        if sf_tip == checkpoint {
140            return Ok(None);
141        }
142
143        // Get end tx from static files (authoritative for sf_tip)
144        let sf_tip_end_tx = provider
145            .static_file_provider()
146            .get_highest_static_file_tx(StaticFileSegment::Transactions)
147            .unwrap_or(0);
148
149        // Get the first tx after the checkpoint block from MDBX (authoritative up to checkpoint)
150        let checkpoint_next_tx = provider
151            .block_body_indices(checkpoint)?
152            .map(|indices| indices.next_tx_num())
153            .unwrap_or(0);
154
155        if sf_tip_end_tx < checkpoint_next_tx {
156            // This should never happen in normal operation - static files should have all
157            // transactions up to sf_tip. If we get here, something is seriously wrong.
158            // The unwind is a best-effort attempt but is probably futile.
159            tracing::warn!(
160                target: "reth::providers::rocksdb",
161                sf_tip_end_tx,
162                checkpoint_next_tx,
163                checkpoint,
164                sf_tip,
165                "TransactionHashNumbers: static file tx tip behind checkpoint, unwind needed"
166            );
167            return Ok(Some(sf_tip));
168        }
169
170        tracing::info!(
171            target: "reth::providers::rocksdb",
172            checkpoint,
173            sf_tip,
174            checkpoint_next_tx,
175            sf_tip_end_tx,
176            "TransactionHashNumbers: healing via transaction ranges"
177        );
178
179        const BATCH_SIZE: u64 = 10_000;
180        let mut batch_start = checkpoint_next_tx;
181
182        while batch_start <= sf_tip_end_tx {
183            let batch_end = batch_start.saturating_add(BATCH_SIZE - 1).min(sf_tip_end_tx);
184
185            tracing::debug!(
186                target: "reth::providers::rocksdb",
187                batch_start,
188                batch_end,
189                "Pruning TransactionHashNumbers batch"
190            );
191
192            self.prune_transaction_hash_numbers_in_range(provider, batch_start..=batch_end)?;
193
194            batch_start = batch_end.saturating_add(1);
195        }
196
197        Ok(None)
198    }
199
200    /// Prunes `TransactionHashNumbers` entries for transactions in the given range.
201    ///
202    /// This fetches transactions from the provider, computes their hashes in parallel,
203    /// and deletes the corresponding entries from `RocksDB` by key. This approach is more
204    /// scalable than iterating all rows because it only processes the transactions that
205    /// need to be pruned.
206    ///
207    /// # Requirements
208    ///
209    /// The provider must be able to supply transaction data (typically from static files)
210    /// so that transaction hashes can be computed. This implies that static files should
211    /// be ahead of or in sync with `RocksDB`.
212    fn prune_transaction_hash_numbers_in_range<Provider>(
213        &self,
214        provider: &Provider,
215        tx_range: std::ops::RangeInclusive<u64>,
216    ) -> ProviderResult<()>
217    where
218        Provider: TransactionsProvider<Transaction: Encodable2718>,
219    {
220        if tx_range.is_empty() {
221            return Ok(());
222        }
223
224        // Fetch transactions in the range and compute their hashes in parallel
225        let hashes: Vec<_> = provider
226            .transactions_by_tx_range(tx_range.clone())?
227            .into_par_iter()
228            .map(|tx| tx.trie_hash())
229            .collect();
230
231        if !hashes.is_empty() {
232            tracing::info!(
233                target: "reth::providers::rocksdb",
234                deleted_count = hashes.len(),
235                tx_range_start = *tx_range.start(),
236                tx_range_end = *tx_range.end(),
237                "Pruning TransactionHashNumbers entries by tx range"
238            );
239
240            let mut batch = self.batch();
241            for hash in hashes {
242                batch.delete::<tables::TransactionHashNumbers>(hash)?;
243            }
244            batch.commit()?;
245        }
246
247        Ok(())
248    }
249
250    /// Heals the `StoragesHistory` table by removing stale entries.
251    ///
252    /// Returns an unwind target if static file tip is behind checkpoint (cannot heal).
253    /// Otherwise iterates changesets in batches to identify and unwind affected keys.
254    fn heal_storages_history<Provider>(
255        &self,
256        provider: &Provider,
257    ) -> ProviderResult<Option<BlockNumber>>
258    where
259        Provider:
260            DBProvider + StageCheckpointReader + StaticFileProviderFactory + StorageChangeSetReader,
261    {
262        let checkpoint = provider
263            .get_stage_checkpoint(StageId::IndexStorageHistory)?
264            .map(|cp| cp.block_number)
265            .unwrap_or(0);
266
267        // Fast path: if checkpoint is 0 and RocksDB has data, clear everything.
268        if checkpoint == 0 && self.first::<tables::StoragesHistory>()?.is_some() {
269            tracing::info!(
270                target: "reth::providers::rocksdb",
271                "StoragesHistory has data but checkpoint is 0, clearing all"
272            );
273            self.clear::<tables::StoragesHistory>()?;
274            return Ok(None);
275        }
276
277        let sf_tip = provider
278            .static_file_provider()
279            .get_highest_static_file_block(StaticFileSegment::StorageChangeSets)
280            .unwrap_or(0);
281
282        if sf_tip < checkpoint {
283            // This should never happen in normal operation - static files are always
284            // committed before RocksDB. If we get here, something is seriously wrong.
285            // The unwind is a best-effort attempt but is probably futile.
286            tracing::warn!(
287                target: "reth::providers::rocksdb",
288                sf_tip,
289                checkpoint,
290                "StoragesHistory: static file tip behind checkpoint, unwind needed"
291            );
292            return Ok(Some(sf_tip));
293        }
294
295        if sf_tip == checkpoint {
296            return Ok(None);
297        }
298
299        let total_blocks = sf_tip - checkpoint;
300        tracing::info!(
301            target: "reth::providers::rocksdb",
302            checkpoint,
303            sf_tip,
304            total_blocks,
305            "StoragesHistory: healing via changesets"
306        );
307
308        let mut batch_start = checkpoint + 1;
309        let mut batch_num = 0u64;
310        let total_batches = total_blocks.div_ceil(HEAL_HISTORY_BATCH_SIZE);
311
312        while batch_start <= sf_tip {
313            let batch_end = (batch_start + HEAL_HISTORY_BATCH_SIZE - 1).min(sf_tip);
314            batch_num += 1;
315
316            let changesets = provider.storage_changesets_range(batch_start..=batch_end)?;
317
318            let unique_keys: HashSet<_> = changesets
319                .into_iter()
320                .map(|(block_addr, entry)| {
321                    // entry.key is a hashed storage key
322                    (block_addr.address(), entry.key.as_b256(), checkpoint + 1)
323                })
324                .collect();
325            let indices: Vec<_> = unique_keys.into_iter().collect();
326
327            if !indices.is_empty() {
328                tracing::info!(
329                    target: "reth::providers::rocksdb",
330                    batch_num,
331                    total_batches,
332                    batch_start,
333                    batch_end,
334                    indices_count = indices.len(),
335                    "StoragesHistory: unwinding batch"
336                );
337
338                let batch = self.unwind_storage_history_indices(&indices)?;
339                self.commit_batch(batch)?;
340            }
341
342            batch_start = batch_end + 1;
343        }
344
345        Ok(None)
346    }
347
348    /// Heals the `AccountsHistory` table by removing stale entries.
349    ///
350    /// Returns an unwind target if static file tip is behind checkpoint (cannot heal).
351    /// Otherwise iterates changesets in batches to identify and unwind affected keys.
352    fn heal_accounts_history<Provider>(
353        &self,
354        provider: &Provider,
355    ) -> ProviderResult<Option<BlockNumber>>
356    where
357        Provider: DBProvider + StageCheckpointReader + StaticFileProviderFactory + ChangeSetReader,
358    {
359        let checkpoint = provider
360            .get_stage_checkpoint(StageId::IndexAccountHistory)?
361            .map(|cp| cp.block_number)
362            .unwrap_or(0);
363
364        // Fast path: if checkpoint is 0 and RocksDB has data, clear everything.
365        if checkpoint == 0 && self.first::<tables::AccountsHistory>()?.is_some() {
366            tracing::info!(
367                target: "reth::providers::rocksdb",
368                "AccountsHistory has data but checkpoint is 0, clearing all"
369            );
370            self.clear::<tables::AccountsHistory>()?;
371            return Ok(None);
372        }
373
374        let sf_tip = provider
375            .static_file_provider()
376            .get_highest_static_file_block(StaticFileSegment::AccountChangeSets)
377            .unwrap_or(0);
378
379        if sf_tip < checkpoint {
380            // This should never happen in normal operation - static files are always
381            // committed before RocksDB. If we get here, something is seriously wrong.
382            // The unwind is a best-effort attempt but is probably futile.
383            tracing::warn!(
384                target: "reth::providers::rocksdb",
385                sf_tip,
386                checkpoint,
387                "AccountsHistory: static file tip behind checkpoint, unwind needed"
388            );
389            return Ok(Some(sf_tip));
390        }
391
392        if sf_tip == checkpoint {
393            return Ok(None);
394        }
395
396        let total_blocks = sf_tip - checkpoint;
397        tracing::info!(
398            target: "reth::providers::rocksdb",
399            checkpoint,
400            sf_tip,
401            total_blocks,
402            "AccountsHistory: healing via changesets"
403        );
404
405        let mut batch_start = checkpoint + 1;
406        let mut batch_num = 0u64;
407        let total_batches = total_blocks.div_ceil(HEAL_HISTORY_BATCH_SIZE);
408
409        while batch_start <= sf_tip {
410            let batch_end = (batch_start + HEAL_HISTORY_BATCH_SIZE - 1).min(sf_tip);
411            batch_num += 1;
412
413            let changesets = provider.account_changesets_range(batch_start..=batch_end)?;
414
415            let mut addresses = HashSet::with_capacity(changesets.len());
416            addresses.extend(changesets.iter().map(|(_, cs)| cs.address));
417            let unwind_from = checkpoint + 1;
418            let indices: Vec<_> = addresses.into_iter().map(|addr| (addr, unwind_from)).collect();
419
420            if !indices.is_empty() {
421                tracing::info!(
422                    target: "reth::providers::rocksdb",
423                    batch_num,
424                    total_batches,
425                    batch_start,
426                    batch_end,
427                    indices_count = indices.len(),
428                    "AccountsHistory: unwinding batch"
429                );
430
431                let batch = self.unwind_account_history_indices(&indices)?;
432                self.commit_batch(batch)?;
433            }
434
435            batch_start = batch_end + 1;
436        }
437
438        Ok(None)
439    }
440}
441
442#[cfg(test)]
443mod tests {
444    use super::*;
445    use crate::{
446        providers::{rocksdb::RocksDBBuilder, static_file::StaticFileWriter},
447        test_utils::create_test_provider_factory,
448        BlockWriter, DatabaseProviderFactory, StageCheckpointWriter, TransactionsProvider,
449    };
450    use alloy_primitives::{Address, B256};
451    use reth_db::cursor::{DbCursorRO, DbCursorRW};
452    use reth_db_api::{
453        models::{storage_sharded_key::StorageShardedKey, StorageSettings},
454        tables::{self, BlockNumberList},
455        transaction::DbTxMut,
456    };
457    use reth_stages_types::StageCheckpoint;
458    use reth_testing_utils::generators::{self, BlockRangeParams};
459    use tempfile::TempDir;
460
461    #[test]
462    fn test_first_last_empty_rocksdb() {
463        let temp_dir = TempDir::new().unwrap();
464        let provider = RocksDBBuilder::new(temp_dir.path())
465            .with_table::<tables::TransactionHashNumbers>()
466            .with_table::<tables::StoragesHistory>()
467            .build()
468            .unwrap();
469
470        // Empty RocksDB, no checkpoints - should be consistent
471        let first = provider.first::<tables::TransactionHashNumbers>().unwrap();
472        let last = provider.last::<tables::TransactionHashNumbers>().unwrap();
473
474        assert!(first.is_none());
475        assert!(last.is_none());
476    }
477
478    #[test]
479    fn test_first_last_with_data() {
480        let temp_dir = TempDir::new().unwrap();
481        let provider = RocksDBBuilder::new(temp_dir.path())
482            .with_table::<tables::TransactionHashNumbers>()
483            .build()
484            .unwrap();
485
486        // Insert some data
487        let tx_hash = B256::from([1u8; 32]);
488        provider.put::<tables::TransactionHashNumbers>(tx_hash, &100).unwrap();
489
490        // RocksDB has data
491        let last = provider.last::<tables::TransactionHashNumbers>().unwrap();
492        assert!(last.is_some());
493        assert_eq!(last.unwrap().1, 100);
494    }
495
496    #[test]
497    fn test_check_consistency_empty_rocksdb_no_checkpoint_is_ok() {
498        let temp_dir = TempDir::new().unwrap();
499        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
500
501        // Create a test provider factory for MDBX
502        let factory = create_test_provider_factory();
503        factory.set_storage_settings_cache(StorageSettings::v2());
504
505        let provider = factory.database_provider_ro().unwrap();
506
507        // Empty RocksDB and no checkpoints - should be consistent (None = no unwind needed)
508        let result = rocksdb.check_consistency(&provider).unwrap();
509        assert_eq!(result, None);
510    }
511
512    #[test]
513    fn test_check_consistency_empty_rocksdb_with_checkpoint_is_first_run() {
514        let temp_dir = TempDir::new().unwrap();
515        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
516
517        // Create a test provider factory for MDBX
518        let factory = create_test_provider_factory();
519        factory.set_storage_settings_cache(StorageSettings::v2());
520
521        // Set a checkpoint indicating we should have processed up to block 100
522        {
523            let provider = factory.database_provider_rw().unwrap();
524            provider
525                .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(100))
526                .unwrap();
527            provider.commit().unwrap();
528        }
529
530        let provider = factory.database_provider_ro().unwrap();
531
532        // RocksDB is empty but checkpoint says block 100 was processed.
533        // Since static file tip defaults to 0 when None, and 0 < 100, an unwind is triggered.
534        let result = rocksdb.check_consistency(&provider).unwrap();
535        assert_eq!(result, Some(0), "Static file tip (0) behind checkpoint (100) triggers unwind");
536    }
537
538    /// Tests that when checkpoint=0 and `RocksDB` has data, all entries are pruned.
539    /// This simulates a crash recovery scenario where the checkpoint was lost.
540    #[test]
541    fn test_check_consistency_checkpoint_zero_with_rocksdb_data_prunes_all() {
542        let temp_dir = TempDir::new().unwrap();
543        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
544
545        let factory = create_test_provider_factory();
546        factory.set_storage_settings_cache(StorageSettings::v2());
547
548        // Generate blocks with real transactions and insert them
549        let mut rng = generators::rng();
550        let blocks = generators::random_block_range(
551            &mut rng,
552            0..=2,
553            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
554        );
555
556        let mut tx_hashes = Vec::new();
557        {
558            let provider = factory.database_provider_rw().unwrap();
559            let mut tx_count = 0u64;
560            for block in &blocks {
561                provider
562                    .insert_block(&block.clone().try_recover().expect("recover block"))
563                    .unwrap();
564                for tx in &block.body().transactions {
565                    let hash = tx.trie_hash();
566                    tx_hashes.push(hash);
567                    rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
568                    tx_count += 1;
569                }
570            }
571            provider.commit().unwrap();
572        }
573
574        // Explicitly clear the checkpoints to simulate crash recovery
575        {
576            let provider = factory.database_provider_rw().unwrap();
577            provider
578                .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(0))
579                .unwrap();
580            provider
581                .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(0))
582                .unwrap();
583            provider
584                .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(0))
585                .unwrap();
586            provider.commit().unwrap();
587        }
588
589        // Verify RocksDB data exists
590        assert!(rocksdb.last::<tables::TransactionHashNumbers>().unwrap().is_some());
591
592        let provider = factory.database_provider_ro().unwrap();
593
594        // checkpoint = 0 but RocksDB has data.
595        // This means RocksDB has stale data that should be cleared.
596        let result = rocksdb.check_consistency(&provider).unwrap();
597        assert_eq!(result, None, "Should heal by clearing, no unwind needed");
598
599        // Verify data was cleared
600        for hash in &tx_hashes {
601            assert!(
602                rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap().is_none(),
603                "RocksDB should be empty after pruning"
604            );
605        }
606    }
607
608    #[test]
609    fn test_check_consistency_storages_history_empty_with_checkpoint_is_first_run() {
610        let temp_dir = TempDir::new().unwrap();
611        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
612
613        // Create a test provider factory for MDBX
614        let factory = create_test_provider_factory();
615        factory.set_storage_settings_cache(StorageSettings::v2());
616
617        // Set a checkpoint indicating we should have processed up to block 100
618        {
619            let provider = factory.database_provider_rw().unwrap();
620            provider
621                .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
622                .unwrap();
623            provider.commit().unwrap();
624        }
625
626        let provider = factory.database_provider_ro().unwrap();
627
628        // RocksDB is empty but checkpoint says block 100 was processed.
629        // Since sf_tip=0 < checkpoint=100, we return unwind target of 0.
630        // This should never happen in normal operation.
631        let result = rocksdb.check_consistency(&provider).unwrap();
632        assert_eq!(result, Some(0), "sf_tip=0 < checkpoint=100 returns unwind target");
633    }
634
635    #[test]
636    fn test_check_consistency_storages_history_has_data_no_checkpoint_prunes_data() {
637        let temp_dir = TempDir::new().unwrap();
638        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
639
640        // Insert data into RocksDB
641        let key = StorageShardedKey::new(Address::ZERO, B256::ZERO, 50);
642        let block_list = BlockNumberList::new_pre_sorted([10, 20, 30, 50]);
643        rocksdb.put::<tables::StoragesHistory>(key, &block_list).unwrap();
644
645        // Verify data exists
646        assert!(rocksdb.last::<tables::StoragesHistory>().unwrap().is_some());
647
648        // Create a test provider factory for MDBX with NO checkpoint
649        let factory = create_test_provider_factory();
650        factory.set_storage_settings_cache(StorageSettings::v2());
651
652        let provider = factory.database_provider_ro().unwrap();
653
654        // RocksDB has data but checkpoint is 0
655        // This means RocksDB has stale data that should be pruned (healed)
656        let result = rocksdb.check_consistency(&provider).unwrap();
657        assert_eq!(result, None, "Should heal by pruning, no unwind needed");
658
659        // Verify data was pruned
660        assert!(
661            rocksdb.last::<tables::StoragesHistory>().unwrap().is_none(),
662            "RocksDB should be empty after pruning"
663        );
664    }
665    #[test]
666    fn test_check_consistency_mdbx_behind_checkpoint_needs_unwind() {
667        let temp_dir = TempDir::new().unwrap();
668        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
669
670        let factory = create_test_provider_factory();
671        factory.set_storage_settings_cache(StorageSettings::v2());
672
673        // Generate blocks with real transactions (blocks 0-2, 6 transactions total)
674        let mut rng = generators::rng();
675        let blocks = generators::random_block_range(
676            &mut rng,
677            0..=2,
678            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
679        );
680
681        {
682            let provider = factory.database_provider_rw().unwrap();
683            let mut tx_count = 0u64;
684            for block in &blocks {
685                provider
686                    .insert_block(&block.clone().try_recover().expect("recover block"))
687                    .unwrap();
688                for tx in &block.body().transactions {
689                    let hash = tx.trie_hash();
690                    rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
691                    tx_count += 1;
692                }
693            }
694            provider.commit().unwrap();
695        }
696
697        // Set checkpoint to block 10 (beyond our actual data at block 2)
698        // sf_tip is at block 2, checkpoint is at block 10
699        // Since sf_tip < checkpoint, we need to unwind to sf_tip
700        {
701            let provider = factory.database_provider_rw().unwrap();
702            provider
703                .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(10))
704                .unwrap();
705            // Reset history checkpoints so they don't interfere
706            provider
707                .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(0))
708                .unwrap();
709            provider
710                .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(0))
711                .unwrap();
712            provider.commit().unwrap();
713        }
714
715        let provider = factory.database_provider_ro().unwrap();
716
717        // sf_tip (2) < checkpoint (10), so unwind to sf_tip is needed
718        let result = rocksdb.check_consistency(&provider).unwrap();
719        assert_eq!(result, Some(2), "sf_tip < checkpoint requires unwind to sf_tip");
720    }
721
722    #[test]
723    fn test_check_consistency_rocksdb_ahead_of_checkpoint_prunes_excess() {
724        let temp_dir = TempDir::new().unwrap();
725        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
726
727        // Create a test provider factory for MDBX
728        let factory = create_test_provider_factory();
729        factory.set_storage_settings_cache(StorageSettings::v2());
730
731        // Generate blocks with real transactions:
732        // Blocks 0-5, each with 2 transactions = 12 total transactions (0-11)
733        let mut rng = generators::rng();
734        let blocks = generators::random_block_range(
735            &mut rng,
736            0..=5,
737            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
738        );
739
740        // Track which hashes belong to which blocks
741        let mut tx_hashes = Vec::new();
742        let mut tx_count = 0u64;
743        {
744            let provider = factory.database_provider_rw().unwrap();
745            // Insert ALL blocks (0-5) to write transactions to static files
746            for block in &blocks {
747                provider
748                    .insert_block(&block.clone().try_recover().expect("recover block"))
749                    .unwrap();
750                for tx in &block.body().transactions {
751                    let hash = tx.trie_hash();
752                    tx_hashes.push(hash);
753                    rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
754                    tx_count += 1;
755                }
756            }
757            provider.commit().unwrap();
758        }
759
760        // Simulate crash recovery scenario:
761        // MDBX was unwound to block 2, but RocksDB and static files still have more data.
762        // Remove TransactionBlocks entries for blocks 3-5 to simulate MDBX unwind.
763        {
764            let provider = factory.database_provider_rw().unwrap();
765            // Delete TransactionBlocks entries for tx > 5 (i.e., for blocks 3-5)
766            // TransactionBlocks maps last_tx_in_block -> block_number
767            // After unwind, only entries for blocks 0-2 should remain (tx 5 -> block 2)
768            let mut cursor = provider.tx_ref().cursor_write::<tables::TransactionBlocks>().unwrap();
769            // Walk and delete entries where block > 2
770            let mut to_delete = Vec::new();
771            let mut walker = cursor.walk(Some(0)).unwrap();
772            while let Some((tx_num, block_num)) = walker.next().transpose().unwrap() {
773                if block_num > 2 {
774                    to_delete.push(tx_num);
775                }
776            }
777            drop(walker);
778            for tx_num in to_delete {
779                cursor.seek_exact(tx_num).unwrap();
780                cursor.delete_current().unwrap();
781            }
782
783            // Set checkpoint to block 2
784            provider
785                .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(2))
786                .unwrap();
787            // Reset history checkpoints so they don't interfere
788            provider
789                .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(0))
790                .unwrap();
791            provider
792                .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(0))
793                .unwrap();
794            provider.commit().unwrap();
795        }
796
797        let provider = factory.database_provider_ro().unwrap();
798
799        // RocksDB has tx hashes for all blocks (0-5)
800        // MDBX TransactionBlocks only goes up to tx 5 (block 2)
801        // Static files have data for all txs (0-11)
802        // This means RocksDB is ahead and should prune entries for tx 6-11
803        let result = rocksdb.check_consistency(&provider).unwrap();
804        assert_eq!(result, None, "Should heal by pruning, no unwind needed");
805
806        // Verify: hashes for blocks 0-2 (tx 0-5) should remain, blocks 3-5 (tx 6-11) should be
807        // pruned First 6 hashes should remain
808        for (i, hash) in tx_hashes.iter().take(6).enumerate() {
809            assert!(
810                rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap().is_some(),
811                "tx {} should remain",
812                i
813            );
814        }
815        // Last 6 hashes should be pruned
816        for (i, hash) in tx_hashes.iter().skip(6).enumerate() {
817            assert!(
818                rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap().is_none(),
819                "tx {} should be pruned",
820                i + 6
821            );
822        }
823    }
824
825    #[test]
826    fn test_check_consistency_storages_history_sentinel_only_with_checkpoint_is_first_run() {
827        let temp_dir = TempDir::new().unwrap();
828        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
829
830        // Insert ONLY sentinel entries (highest_block_number = u64::MAX)
831        // This simulates a scenario where history tracking started but no shards were completed
832        let key_sentinel_1 = StorageShardedKey::new(Address::ZERO, B256::ZERO, u64::MAX);
833        let key_sentinel_2 = StorageShardedKey::new(Address::random(), B256::random(), u64::MAX);
834        let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
835        rocksdb.put::<tables::StoragesHistory>(key_sentinel_1, &block_list).unwrap();
836        rocksdb.put::<tables::StoragesHistory>(key_sentinel_2, &block_list).unwrap();
837
838        // Verify entries exist (not empty table)
839        assert!(rocksdb.first::<tables::StoragesHistory>().unwrap().is_some());
840
841        // Create a test provider factory for MDBX
842        let factory = create_test_provider_factory();
843        factory.set_storage_settings_cache(StorageSettings::v2());
844
845        // Set a checkpoint indicating we should have processed up to block 100
846        {
847            let provider = factory.database_provider_rw().unwrap();
848            provider
849                .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
850                .unwrap();
851            provider.commit().unwrap();
852        }
853
854        let provider = factory.database_provider_ro().unwrap();
855
856        // RocksDB has only sentinel entries but checkpoint is set.
857        // Since sf_tip=0 < checkpoint=100, we return unwind target of 0.
858        // This should never happen in normal operation.
859        let result = rocksdb.check_consistency(&provider).unwrap();
860        assert_eq!(result, Some(0), "sf_tip=0 < checkpoint=100 returns unwind target");
861    }
862
863    #[test]
864    fn test_check_consistency_accounts_history_sentinel_only_with_checkpoint_is_first_run() {
865        use reth_db_api::models::ShardedKey;
866
867        let temp_dir = TempDir::new().unwrap();
868        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
869
870        // Insert ONLY sentinel entries (highest_block_number = u64::MAX)
871        let key_sentinel_1 = ShardedKey::new(Address::ZERO, u64::MAX);
872        let key_sentinel_2 = ShardedKey::new(Address::random(), u64::MAX);
873        let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
874        rocksdb.put::<tables::AccountsHistory>(key_sentinel_1, &block_list).unwrap();
875        rocksdb.put::<tables::AccountsHistory>(key_sentinel_2, &block_list).unwrap();
876
877        // Verify entries exist (not empty table)
878        assert!(rocksdb.first::<tables::AccountsHistory>().unwrap().is_some());
879
880        // Create a test provider factory for MDBX
881        let factory = create_test_provider_factory();
882        factory.set_storage_settings_cache(StorageSettings::v2());
883
884        // Set a checkpoint indicating we should have processed up to block 100
885        {
886            let provider = factory.database_provider_rw().unwrap();
887            provider
888                .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
889                .unwrap();
890            provider.commit().unwrap();
891        }
892
893        let provider = factory.database_provider_ro().unwrap();
894
895        // RocksDB has only sentinel entries but checkpoint is set.
896        // Since sf_tip=0 < checkpoint=100, we return unwind target of 0.
897        // This should never happen in normal operation.
898        let result = rocksdb.check_consistency(&provider).unwrap();
899        assert_eq!(result, Some(0), "sf_tip=0 < checkpoint=100 returns unwind target");
900    }
901
902    /// Test that pruning works by fetching transactions and computing their hashes,
903    /// rather than iterating all rows. This test uses random blocks with unique
904    /// transactions so we can verify the correct entries are pruned.
905    #[test]
906    fn test_prune_transaction_hash_numbers_by_range() {
907        let temp_dir = TempDir::new().unwrap();
908        let rocksdb = RocksDBBuilder::new(temp_dir.path())
909            .with_table::<tables::TransactionHashNumbers>()
910            .build()
911            .unwrap();
912
913        // Create a test provider factory for MDBX
914        let factory = create_test_provider_factory();
915        factory.set_storage_settings_cache(StorageSettings::v2());
916
917        // Generate random blocks with unique transactions
918        // Block 0 (genesis) has no transactions
919        // Blocks 1-5 each have 2 transactions = 10 transactions total
920        let mut rng = generators::rng();
921        let blocks = generators::random_block_range(
922            &mut rng,
923            0..=5,
924            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
925        );
926
927        // Insert blocks into the database
928        let mut tx_count = 0u64;
929        let mut tx_hashes = Vec::new();
930        {
931            let provider = factory.database_provider_rw().unwrap();
932
933            for block in &blocks {
934                provider
935                    .insert_block(&block.clone().try_recover().expect("recover block"))
936                    .unwrap();
937
938                // Store transaction hash -> tx_number mappings in RocksDB
939                for tx in &block.body().transactions {
940                    let hash = tx.trie_hash();
941                    tx_hashes.push(hash);
942                    rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
943                    tx_count += 1;
944                }
945            }
946
947            // Set checkpoint to block 2 (meaning we should only have tx hashes for blocks 0-2)
948            // Blocks 0, 1, 2 have 6 transactions (2 each), so tx 0-5 should remain
949            provider
950                .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(2))
951                .unwrap();
952            provider.commit().unwrap();
953        }
954
955        // At this point:
956        // - RocksDB has tx hashes for blocks 0-5 (10 total: 2 per block)
957        // - Checkpoint says we only processed up to block 2
958        // - We need to prune tx hashes for blocks 3, 4, 5 (tx 6-9)
959
960        // Verify RocksDB has the expected number of entries before pruning
961        let rocksdb_count_before: usize =
962            rocksdb.iter::<tables::TransactionHashNumbers>().unwrap().count();
963        assert_eq!(
964            rocksdb_count_before, tx_count as usize,
965            "RocksDB should have all {} transaction hashes before pruning",
966            tx_count
967        );
968
969        let provider = factory.database_provider_ro().unwrap();
970
971        // Verify we can fetch transactions by tx range
972        let all_txs = provider.transactions_by_tx_range(0..tx_count).unwrap();
973        assert_eq!(all_txs.len(), tx_count as usize, "Should be able to fetch all transactions");
974
975        // Verify the hashes match between what we stored and what we compute from fetched txs
976        for (i, tx) in all_txs.iter().enumerate() {
977            let computed_hash = tx.trie_hash();
978            assert_eq!(
979                computed_hash, tx_hashes[i],
980                "Hash mismatch for tx {}: stored {:?} vs computed {:?}",
981                i, tx_hashes[i], computed_hash
982            );
983        }
984
985        // Blocks 0, 1, 2 have 2 tx each = 6 tx total (indices 0-5)
986        // We want to keep tx 0-5, prune tx 6-9
987        let max_tx_to_keep = 5u64;
988        let tx_to_prune_start = max_tx_to_keep + 1;
989
990        // Prune transactions 6-9 (blocks 3-5)
991        rocksdb
992            .prune_transaction_hash_numbers_in_range(&provider, tx_to_prune_start..=(tx_count - 1))
993            .expect("prune should succeed");
994
995        // Verify: transactions 0-5 should remain, 6-9 should be pruned
996        let mut remaining_count = 0;
997        for result in rocksdb.iter::<tables::TransactionHashNumbers>().unwrap() {
998            let (_hash, tx_num) = result.unwrap();
999            assert!(
1000                tx_num <= max_tx_to_keep,
1001                "Transaction {} should have been pruned (> {})",
1002                tx_num,
1003                max_tx_to_keep
1004            );
1005            remaining_count += 1;
1006        }
1007        assert_eq!(
1008            remaining_count,
1009            (max_tx_to_keep + 1) as usize,
1010            "Should have {} transactions (0-{})",
1011            max_tx_to_keep + 1,
1012            max_tx_to_keep
1013        );
1014    }
1015
1016    #[test]
1017    fn test_check_consistency_accounts_history_empty_with_checkpoint_is_first_run() {
1018        let temp_dir = TempDir::new().unwrap();
1019        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1020
1021        // Create a test provider factory for MDBX
1022        let factory = create_test_provider_factory();
1023        factory.set_storage_settings_cache(StorageSettings::v2());
1024
1025        // Set a checkpoint indicating we should have processed up to block 100
1026        {
1027            let provider = factory.database_provider_rw().unwrap();
1028            provider
1029                .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
1030                .unwrap();
1031            provider.commit().unwrap();
1032        }
1033
1034        let provider = factory.database_provider_ro().unwrap();
1035
1036        // RocksDB is empty but checkpoint says block 100 was processed.
1037        // Since sf_tip=0 < checkpoint=100, we return unwind target of 0.
1038        // This should never happen in normal operation.
1039        let result = rocksdb.check_consistency(&provider).unwrap();
1040        assert_eq!(result, Some(0), "sf_tip=0 < checkpoint=100 returns unwind target");
1041    }
1042
1043    #[test]
1044    fn test_check_consistency_accounts_history_has_data_no_checkpoint_prunes_data() {
1045        use reth_db_api::models::ShardedKey;
1046
1047        let temp_dir = TempDir::new().unwrap();
1048        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1049
1050        // Insert data into RocksDB
1051        let key = ShardedKey::new(Address::ZERO, 50);
1052        let block_list = BlockNumberList::new_pre_sorted([10, 20, 30, 50]);
1053        rocksdb.put::<tables::AccountsHistory>(key, &block_list).unwrap();
1054
1055        // Verify data exists
1056        assert!(rocksdb.last::<tables::AccountsHistory>().unwrap().is_some());
1057
1058        // Create a test provider factory for MDBX with NO checkpoint
1059        let factory = create_test_provider_factory();
1060        factory.set_storage_settings_cache(StorageSettings::v2());
1061
1062        let provider = factory.database_provider_ro().unwrap();
1063
1064        // RocksDB has data but checkpoint is 0
1065        // This means RocksDB has stale data that should be pruned (healed)
1066        let result = rocksdb.check_consistency(&provider).unwrap();
1067        assert_eq!(result, None, "Should heal by pruning, no unwind needed");
1068
1069        // Verify data was pruned
1070        assert!(
1071            rocksdb.last::<tables::AccountsHistory>().unwrap().is_none(),
1072            "RocksDB should be empty after pruning"
1073        );
1074    }
1075
1076    #[test]
1077    fn test_check_consistency_accounts_history_sf_tip_equals_checkpoint_no_action() {
1078        use reth_db::models::AccountBeforeTx;
1079        use reth_db_api::models::ShardedKey;
1080        use reth_static_file_types::StaticFileSegment;
1081
1082        let temp_dir = TempDir::new().unwrap();
1083        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1084
1085        // Insert some AccountsHistory entries with various highest_block_numbers
1086        let key1 = ShardedKey::new(Address::ZERO, 50);
1087        let key2 = ShardedKey::new(Address::random(), 75);
1088        let key3 = ShardedKey::new(Address::random(), u64::MAX); // sentinel
1089        let block_list1 = BlockNumberList::new_pre_sorted([10, 20, 30, 50]);
1090        let block_list2 = BlockNumberList::new_pre_sorted([40, 60, 75]);
1091        let block_list3 = BlockNumberList::new_pre_sorted([80, 90, 100]);
1092        rocksdb.put::<tables::AccountsHistory>(key1, &block_list1).unwrap();
1093        rocksdb.put::<tables::AccountsHistory>(key2, &block_list2).unwrap();
1094        rocksdb.put::<tables::AccountsHistory>(key3, &block_list3).unwrap();
1095
1096        // Capture RocksDB state before consistency check
1097        let entries_before: Vec<_> =
1098            rocksdb.iter::<tables::AccountsHistory>().unwrap().map(|r| r.unwrap()).collect();
1099        assert_eq!(entries_before.len(), 3, "Should have 3 entries before check");
1100
1101        // Create a test provider factory for MDBX
1102        let factory = create_test_provider_factory();
1103        factory.set_storage_settings_cache(StorageSettings::v2());
1104
1105        // Write account changesets to static files for blocks 0-100
1106        {
1107            let sf_provider = factory.static_file_provider();
1108            let mut writer =
1109                sf_provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
1110
1111            for block_num in 0..=100 {
1112                let changeset = vec![AccountBeforeTx { address: Address::random(), info: None }];
1113                writer.append_account_changeset(changeset, block_num).unwrap();
1114            }
1115
1116            writer.commit().unwrap();
1117        }
1118
1119        // Set IndexAccountHistory checkpoint to block 100 (same as sf_tip)
1120        {
1121            let provider = factory.database_provider_rw().unwrap();
1122            provider
1123                .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
1124                .unwrap();
1125            provider.commit().unwrap();
1126        }
1127
1128        let provider = factory.database_provider_ro().unwrap();
1129
1130        // Verify sf_tip equals checkpoint (both at 100)
1131        let sf_tip = provider
1132            .static_file_provider()
1133            .get_highest_static_file_block(StaticFileSegment::AccountChangeSets)
1134            .unwrap();
1135        assert_eq!(sf_tip, 100, "Static file tip should be 100");
1136
1137        // Run check_consistency - should return None (no unwind needed)
1138        let result = rocksdb.check_consistency(&provider).unwrap();
1139        assert_eq!(result, None, "sf_tip == checkpoint should not require unwind");
1140
1141        // Verify NO entries are deleted - RocksDB state unchanged
1142        let entries_after: Vec<_> =
1143            rocksdb.iter::<tables::AccountsHistory>().unwrap().map(|r| r.unwrap()).collect();
1144
1145        assert_eq!(
1146            entries_after.len(),
1147            entries_before.len(),
1148            "RocksDB entry count should be unchanged when sf_tip == checkpoint"
1149        );
1150
1151        // Verify exact entries are preserved
1152        for (before, after) in entries_before.iter().zip(entries_after.iter()) {
1153            assert_eq!(before.0.key, after.0.key, "Entry key should be unchanged");
1154            assert_eq!(
1155                before.0.highest_block_number, after.0.highest_block_number,
1156                "Entry highest_block_number should be unchanged"
1157            );
1158            assert_eq!(before.1, after.1, "Entry block list should be unchanged");
1159        }
1160    }
1161
1162    /// Tests `StoragesHistory` changeset-based healing with enough blocks to trigger batching.
1163    ///
1164    /// Scenario:
1165    /// 1. Generate 15,000 blocks worth of storage changeset data (to exceed the 10k batch size)
1166    /// 2. Each block has 1 storage change (address + slot + value)
1167    /// 3. Write storage changesets to static files for all 15k blocks
1168    /// 4. Set `IndexStorageHistory` checkpoint to block 5000
1169    /// 5. Insert stale `StoragesHistory` entries in `RocksDB` for (address, slot) pairs that
1170    ///    changed in blocks 5001-15000
1171    /// 6. Run `check_consistency`
1172    /// 7. Verify stale entries for blocks > 5000 are pruned and batching worked
1173    #[test]
1174    fn test_check_consistency_storages_history_heals_via_changesets_large_range() {
1175        use alloy_primitives::U256;
1176        use reth_db_api::models::StorageBeforeTx;
1177
1178        const TOTAL_BLOCKS: u64 = 15_000;
1179        const CHECKPOINT_BLOCK: u64 = 5_000;
1180
1181        let temp_dir = TempDir::new().unwrap();
1182        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1183
1184        let factory = create_test_provider_factory();
1185        factory.set_storage_settings_cache(StorageSettings::v2());
1186
1187        // Helper to generate address from block number (reuses stack arrays)
1188        #[inline]
1189        fn make_address(block_num: u64) -> Address {
1190            let mut addr_bytes = [0u8; 20];
1191            addr_bytes[0..8].copy_from_slice(&block_num.to_le_bytes());
1192            Address::from(addr_bytes)
1193        }
1194
1195        // Helper to generate slot from block number (reuses stack arrays)
1196        #[inline]
1197        fn make_slot(block_num: u64) -> B256 {
1198            let mut slot_bytes = [0u8; 32];
1199            slot_bytes[0..8].copy_from_slice(&block_num.to_le_bytes());
1200            B256::from(slot_bytes)
1201        }
1202
1203        // Write storage changesets to static files for 15k blocks.
1204        // Each block has 1 storage change with a unique (address, slot) pair.
1205        {
1206            let sf_provider = factory.static_file_provider();
1207            let mut writer =
1208                sf_provider.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
1209
1210            // Reuse changeset vec to avoid repeated allocations
1211            let mut changeset = Vec::with_capacity(1);
1212
1213            for block_num in 0..TOTAL_BLOCKS {
1214                changeset.clear();
1215                changeset.push(StorageBeforeTx {
1216                    address: make_address(block_num),
1217                    key: make_slot(block_num),
1218                    value: U256::from(block_num),
1219                });
1220
1221                writer.append_storage_changeset(changeset.clone(), block_num).unwrap();
1222            }
1223
1224            writer.commit().unwrap();
1225        }
1226
1227        // Verify static files have data up to block 14999
1228        {
1229            let sf_provider = factory.static_file_provider();
1230            let highest = sf_provider
1231                .get_highest_static_file_block(StaticFileSegment::StorageChangeSets)
1232                .unwrap();
1233            assert_eq!(highest, TOTAL_BLOCKS - 1, "Static files should have blocks 0..14999");
1234        }
1235
1236        // Set IndexStorageHistory checkpoint to block 5000
1237        {
1238            let provider = factory.database_provider_rw().unwrap();
1239            provider
1240                .save_stage_checkpoint(
1241                    StageId::IndexStorageHistory,
1242                    StageCheckpoint::new(CHECKPOINT_BLOCK),
1243                )
1244                .unwrap();
1245            provider.commit().unwrap();
1246        }
1247
1248        // Insert stale StoragesHistory entries for blocks 5001-14999
1249        // These are (address, slot) pairs that changed after the checkpoint
1250        for block_num in (CHECKPOINT_BLOCK + 1)..TOTAL_BLOCKS {
1251            let key =
1252                StorageShardedKey::new(make_address(block_num), make_slot(block_num), block_num);
1253            let block_list = BlockNumberList::new_pre_sorted([block_num]);
1254            rocksdb.put::<tables::StoragesHistory>(key, &block_list).unwrap();
1255        }
1256
1257        // Verify RocksDB has stale entries before healing
1258        let count_before: usize = rocksdb.iter::<tables::StoragesHistory>().unwrap().count();
1259        assert_eq!(
1260            count_before,
1261            (TOTAL_BLOCKS - CHECKPOINT_BLOCK - 1) as usize,
1262            "Should have {} stale entries before healing",
1263            TOTAL_BLOCKS - CHECKPOINT_BLOCK - 1
1264        );
1265
1266        // Run check_consistency - this should heal by pruning stale entries
1267        let provider = factory.database_provider_ro().unwrap();
1268        let result = rocksdb.check_consistency(&provider).unwrap();
1269        assert_eq!(result, None, "Should heal via changesets, no unwind needed");
1270
1271        // Verify all stale entries were pruned
1272        // After healing, entries with highest_block_number > checkpoint should be gone
1273        let mut remaining_stale = 0;
1274        for result in rocksdb.iter::<tables::StoragesHistory>().unwrap() {
1275            let (key, _) = result.unwrap();
1276            if key.sharded_key.highest_block_number > CHECKPOINT_BLOCK {
1277                remaining_stale += 1;
1278            }
1279        }
1280        assert_eq!(
1281            remaining_stale, 0,
1282            "All stale entries (block > {}) should be pruned",
1283            CHECKPOINT_BLOCK
1284        );
1285    }
1286
1287    /// Tests that healing preserves entries at exactly the checkpoint block.
1288    ///
1289    /// This catches off-by-one bugs where checkpoint block data is incorrectly deleted.
1290    #[test]
1291    fn test_check_consistency_storages_history_preserves_checkpoint_block() {
1292        use alloy_primitives::U256;
1293        use reth_db_api::models::StorageBeforeTx;
1294
1295        const CHECKPOINT_BLOCK: u64 = 100;
1296        const SF_TIP: u64 = 200;
1297
1298        let temp_dir = TempDir::new().unwrap();
1299        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1300
1301        let factory = create_test_provider_factory();
1302        factory.set_storage_settings_cache(StorageSettings::v2());
1303
1304        let checkpoint_addr = Address::repeat_byte(0xAA);
1305        let checkpoint_slot = B256::repeat_byte(0xBB);
1306        let stale_addr = Address::repeat_byte(0xCC);
1307        let stale_slot = B256::repeat_byte(0xDD);
1308
1309        // Write storage changesets to static files
1310        {
1311            let sf_provider = factory.static_file_provider();
1312            let mut writer =
1313                sf_provider.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
1314
1315            for block_num in 0..=SF_TIP {
1316                let changeset = if block_num == CHECKPOINT_BLOCK {
1317                    vec![StorageBeforeTx {
1318                        address: checkpoint_addr,
1319                        key: checkpoint_slot,
1320                        value: U256::from(block_num),
1321                    }]
1322                } else if block_num > CHECKPOINT_BLOCK {
1323                    vec![StorageBeforeTx {
1324                        address: stale_addr,
1325                        key: stale_slot,
1326                        value: U256::from(block_num),
1327                    }]
1328                } else {
1329                    vec![StorageBeforeTx {
1330                        address: Address::ZERO,
1331                        key: B256::ZERO,
1332                        value: U256::ZERO,
1333                    }]
1334                };
1335                writer.append_storage_changeset(changeset, block_num).unwrap();
1336            }
1337            writer.commit().unwrap();
1338        }
1339
1340        // Set checkpoint
1341        {
1342            let provider = factory.database_provider_rw().unwrap();
1343            provider
1344                .save_stage_checkpoint(
1345                    StageId::IndexStorageHistory,
1346                    StageCheckpoint::new(CHECKPOINT_BLOCK),
1347                )
1348                .unwrap();
1349            provider.commit().unwrap();
1350        }
1351
1352        // Insert entry AT the checkpoint block (should be preserved)
1353        let checkpoint_key =
1354            StorageShardedKey::new(checkpoint_addr, checkpoint_slot, CHECKPOINT_BLOCK);
1355        let checkpoint_list = BlockNumberList::new_pre_sorted([CHECKPOINT_BLOCK]);
1356        rocksdb.put::<tables::StoragesHistory>(checkpoint_key.clone(), &checkpoint_list).unwrap();
1357
1358        // Insert stale entry AFTER the checkpoint (should be removed)
1359        let stale_key = StorageShardedKey::new(stale_addr, stale_slot, SF_TIP);
1360        let stale_list = BlockNumberList::new_pre_sorted([CHECKPOINT_BLOCK + 1, SF_TIP]);
1361        rocksdb.put::<tables::StoragesHistory>(stale_key.clone(), &stale_list).unwrap();
1362
1363        // Run healing
1364        let provider = factory.database_provider_ro().unwrap();
1365        let result = rocksdb.check_consistency(&provider).unwrap();
1366        assert_eq!(result, None, "Should heal without unwind");
1367
1368        // Verify checkpoint block entry is PRESERVED
1369        let preserved = rocksdb.get::<tables::StoragesHistory>(checkpoint_key).unwrap();
1370        assert!(preserved.is_some(), "Entry at checkpoint block should be preserved, not deleted");
1371
1372        // Verify stale entry is removed or unwound
1373        let stale = rocksdb.get::<tables::StoragesHistory>(stale_key).unwrap();
1374        assert!(stale.is_none(), "Stale entry after checkpoint should be removed");
1375    }
1376
1377    /// Tests `AccountsHistory` changeset-based healing with enough blocks to trigger batching.
1378    ///
1379    /// Scenario:
1380    /// 1. Generate 15,000 blocks worth of account changeset data (to exceed the 10k batch size)
1381    /// 2. Each block has 1 account change (simple - just random addresses)
1382    /// 3. Write account changesets to static files for all 15k blocks
1383    /// 4. Set `IndexAccountHistory` checkpoint to block 5000
1384    /// 5. Insert stale `AccountsHistory` entries in `RocksDB` for addresses that changed in blocks
1385    ///    5001-15000
1386    /// 6. Run `check_consistency`
1387    /// 7. Verify:
1388    ///    - Stale entries for blocks > 5000 are pruned
1389    ///    - The batching worked (no OOM, completed successfully)
1390    #[test]
1391    fn test_check_consistency_accounts_history_heals_via_changesets_large_range() {
1392        use reth_db::models::AccountBeforeTx;
1393        use reth_db_api::models::ShardedKey;
1394        use reth_static_file_types::StaticFileSegment;
1395
1396        let temp_dir = TempDir::new().unwrap();
1397        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1398
1399        // Create test provider factory
1400        let factory = create_test_provider_factory();
1401        factory.set_storage_settings_cache(StorageSettings::v2());
1402
1403        const TOTAL_BLOCKS: u64 = 15_000;
1404        const CHECKPOINT_BLOCK: u64 = 5_000;
1405
1406        // Helper to generate address from block number (avoids pre-allocating 15k addresses)
1407        #[inline]
1408        fn make_address(block_num: u64) -> Address {
1409            let mut addr = Address::ZERO;
1410            addr.0[0..8].copy_from_slice(&block_num.to_le_bytes());
1411            addr
1412        }
1413
1414        // Write account changesets to static files for all 15k blocks
1415        {
1416            let sf_provider = factory.static_file_provider();
1417            let mut writer =
1418                sf_provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
1419
1420            // Reuse changeset vec to avoid repeated allocations
1421            let mut changeset = Vec::with_capacity(1);
1422
1423            for block_num in 0..TOTAL_BLOCKS {
1424                changeset.clear();
1425                changeset.push(AccountBeforeTx { address: make_address(block_num), info: None });
1426                writer.append_account_changeset(changeset.clone(), block_num).unwrap();
1427            }
1428
1429            writer.commit().unwrap();
1430        }
1431
1432        // Insert stale AccountsHistory entries in RocksDB for addresses that changed
1433        // in blocks 5001-15000 (i.e., blocks after the checkpoint)
1434        // These should be pruned by check_consistency
1435        for block_num in (CHECKPOINT_BLOCK + 1)..TOTAL_BLOCKS {
1436            let key = ShardedKey::new(make_address(block_num), block_num);
1437            let block_list = BlockNumberList::new_pre_sorted([block_num]);
1438            rocksdb.put::<tables::AccountsHistory>(key, &block_list).unwrap();
1439        }
1440
1441        // Also insert some valid entries for blocks <= 5000 that should NOT be pruned
1442        for block_num in [100u64, 500, 1000, 2500, 5000] {
1443            let key = ShardedKey::new(make_address(block_num), block_num);
1444            let block_list = BlockNumberList::new_pre_sorted([block_num]);
1445            rocksdb.put::<tables::AccountsHistory>(key, &block_list).unwrap();
1446        }
1447
1448        // Verify we have entries before healing
1449        let entries_before: usize = rocksdb.iter::<tables::AccountsHistory>().unwrap().count();
1450        let stale_count = (TOTAL_BLOCKS - CHECKPOINT_BLOCK - 1) as usize;
1451        let valid_count = 5usize;
1452        assert_eq!(
1453            entries_before,
1454            stale_count + valid_count,
1455            "Should have {} stale + {} valid entries before healing",
1456            stale_count,
1457            valid_count
1458        );
1459
1460        // Set IndexAccountHistory checkpoint to block 5000
1461        {
1462            let provider = factory.database_provider_rw().unwrap();
1463            provider
1464                .save_stage_checkpoint(
1465                    StageId::IndexAccountHistory,
1466                    StageCheckpoint::new(CHECKPOINT_BLOCK),
1467                )
1468                .unwrap();
1469            provider.commit().unwrap();
1470        }
1471
1472        let provider = factory.database_provider_ro().unwrap();
1473
1474        // Verify sf_tip > checkpoint
1475        let sf_tip = provider
1476            .static_file_provider()
1477            .get_highest_static_file_block(StaticFileSegment::AccountChangeSets)
1478            .unwrap();
1479        assert_eq!(sf_tip, TOTAL_BLOCKS - 1, "Static file tip should be 14999");
1480        assert!(sf_tip > CHECKPOINT_BLOCK, "sf_tip should be > checkpoint to trigger healing");
1481
1482        // Run check_consistency - this should trigger batched changeset-based healing
1483        let result = rocksdb.check_consistency(&provider).unwrap();
1484        assert_eq!(result, None, "Healing should succeed without requiring unwind");
1485
1486        // Verify: all stale entries for blocks > 5000 should be pruned
1487        // Count remaining entries with highest_block_number > checkpoint
1488        let mut remaining_stale = 0;
1489        for result in rocksdb.iter::<tables::AccountsHistory>().unwrap() {
1490            let (key, _) = result.unwrap();
1491            if key.highest_block_number > CHECKPOINT_BLOCK && key.highest_block_number != u64::MAX {
1492                remaining_stale += 1;
1493            }
1494        }
1495        assert_eq!(
1496            remaining_stale, 0,
1497            "All stale entries (block > {}) should be pruned",
1498            CHECKPOINT_BLOCK
1499        );
1500    }
1501
1502    /// Tests that accounts history healing preserves entries at exactly the checkpoint block.
1503    #[test]
1504    fn test_check_consistency_accounts_history_preserves_checkpoint_block() {
1505        use reth_db::models::AccountBeforeTx;
1506        use reth_db_api::models::ShardedKey;
1507
1508        const CHECKPOINT_BLOCK: u64 = 100;
1509        const SF_TIP: u64 = 200;
1510
1511        let temp_dir = TempDir::new().unwrap();
1512        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1513
1514        let factory = create_test_provider_factory();
1515        factory.set_storage_settings_cache(StorageSettings::v2());
1516
1517        let checkpoint_addr = Address::repeat_byte(0xAA);
1518        let stale_addr = Address::repeat_byte(0xCC);
1519
1520        // Write account changesets to static files
1521        {
1522            let sf_provider = factory.static_file_provider();
1523            let mut writer =
1524                sf_provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
1525
1526            for block_num in 0..=SF_TIP {
1527                let changeset = if block_num == CHECKPOINT_BLOCK {
1528                    vec![AccountBeforeTx { address: checkpoint_addr, info: None }]
1529                } else if block_num > CHECKPOINT_BLOCK {
1530                    vec![AccountBeforeTx { address: stale_addr, info: None }]
1531                } else {
1532                    vec![AccountBeforeTx { address: Address::ZERO, info: None }]
1533                };
1534                writer.append_account_changeset(changeset, block_num).unwrap();
1535            }
1536            writer.commit().unwrap();
1537        }
1538
1539        // Set checkpoint
1540        {
1541            let provider = factory.database_provider_rw().unwrap();
1542            provider
1543                .save_stage_checkpoint(
1544                    StageId::IndexAccountHistory,
1545                    StageCheckpoint::new(CHECKPOINT_BLOCK),
1546                )
1547                .unwrap();
1548            provider.commit().unwrap();
1549        }
1550
1551        // Insert entry AT the checkpoint block (should be preserved)
1552        let checkpoint_key = ShardedKey::new(checkpoint_addr, CHECKPOINT_BLOCK);
1553        let checkpoint_list = BlockNumberList::new_pre_sorted([CHECKPOINT_BLOCK]);
1554        rocksdb.put::<tables::AccountsHistory>(checkpoint_key.clone(), &checkpoint_list).unwrap();
1555
1556        // Insert stale entry AFTER the checkpoint (should be removed)
1557        let stale_key = ShardedKey::new(stale_addr, SF_TIP);
1558        let stale_list = BlockNumberList::new_pre_sorted([CHECKPOINT_BLOCK + 1, SF_TIP]);
1559        rocksdb.put::<tables::AccountsHistory>(stale_key.clone(), &stale_list).unwrap();
1560
1561        // Run healing
1562        let provider = factory.database_provider_ro().unwrap();
1563        let result = rocksdb.check_consistency(&provider).unwrap();
1564        assert_eq!(result, None, "Should heal without unwind");
1565
1566        // Verify checkpoint block entry is PRESERVED
1567        let preserved = rocksdb.get::<tables::AccountsHistory>(checkpoint_key).unwrap();
1568        assert!(preserved.is_some(), "Entry at checkpoint block should be preserved, not deleted");
1569
1570        // Verify stale entry is removed or unwound
1571        let stale = rocksdb.get::<tables::AccountsHistory>(stale_key).unwrap();
1572        assert!(stale.is_none(), "Stale entry after checkpoint should be removed");
1573    }
1574
1575    #[test]
1576    fn test_check_consistency_storages_history_sf_tip_equals_checkpoint_no_action() {
1577        use alloy_primitives::U256;
1578        use reth_db::models::StorageBeforeTx;
1579        use reth_static_file_types::StaticFileSegment;
1580
1581        let temp_dir = TempDir::new().unwrap();
1582        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1583
1584        // Insert StoragesHistory entries into RocksDB
1585        let key1 = StorageShardedKey::new(Address::ZERO, B256::ZERO, 50);
1586        let key2 = StorageShardedKey::new(Address::random(), B256::random(), 80);
1587        let block_list1 = BlockNumberList::new_pre_sorted([10, 20, 30, 50]);
1588        let block_list2 = BlockNumberList::new_pre_sorted([40, 60, 80]);
1589        rocksdb.put::<tables::StoragesHistory>(key1, &block_list1).unwrap();
1590        rocksdb.put::<tables::StoragesHistory>(key2, &block_list2).unwrap();
1591
1592        // Capture entries before consistency check
1593        let entries_before: Vec<_> =
1594            rocksdb.iter::<tables::StoragesHistory>().unwrap().map(|r| r.unwrap()).collect();
1595
1596        // Create a test provider factory
1597        let factory = create_test_provider_factory();
1598        factory.set_storage_settings_cache(StorageSettings::v2());
1599
1600        // Write storage changesets to static files for blocks 0-100
1601        {
1602            let sf_provider = factory.static_file_provider();
1603            let mut writer =
1604                sf_provider.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
1605
1606            for block_num in 0..=100u64 {
1607                let changeset = vec![StorageBeforeTx {
1608                    address: Address::ZERO,
1609                    key: B256::with_last_byte(block_num as u8),
1610                    value: U256::from(block_num),
1611                }];
1612                writer.append_storage_changeset(changeset, block_num).unwrap();
1613            }
1614            writer.commit().unwrap();
1615        }
1616
1617        // Set IndexStorageHistory checkpoint to block 100 (same as sf_tip)
1618        {
1619            let provider = factory.database_provider_rw().unwrap();
1620            provider
1621                .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
1622                .unwrap();
1623            provider.commit().unwrap();
1624        }
1625
1626        let provider = factory.database_provider_ro().unwrap();
1627
1628        // Verify sf_tip equals checkpoint (both at 100)
1629        let sf_tip = provider
1630            .static_file_provider()
1631            .get_highest_static_file_block(StaticFileSegment::StorageChangeSets)
1632            .unwrap();
1633        assert_eq!(sf_tip, 100, "Static file tip should be 100");
1634
1635        // Run check_consistency - should return None (no unwind needed)
1636        let result = rocksdb.check_consistency(&provider).unwrap();
1637        assert_eq!(result, None, "sf_tip == checkpoint should not require unwind");
1638
1639        // Verify NO entries are deleted - RocksDB state unchanged
1640        let entries_after: Vec<_> =
1641            rocksdb.iter::<tables::StoragesHistory>().unwrap().map(|r| r.unwrap()).collect();
1642
1643        assert_eq!(
1644            entries_after.len(),
1645            entries_before.len(),
1646            "RocksDB entry count should be unchanged when sf_tip == checkpoint"
1647        );
1648
1649        // Verify exact entries are preserved
1650        for (before, after) in entries_before.iter().zip(entries_after.iter()) {
1651            assert_eq!(before.0, after.0, "Entry key should be unchanged");
1652            assert_eq!(before.1, after.1, "Entry block list should be unchanged");
1653        }
1654    }
1655}