Skip to main content

reth_provider/providers/rocksdb/
invariants.rs

1//! Invariant checking for `RocksDB` tables.
2//!
3//! This module provides consistency checks for tables stored in `RocksDB`, similar to the
4//! consistency checks for static files. The goal is to detect and potentially heal
5//! inconsistencies between `RocksDB` data and MDBX checkpoints.
6
7use super::RocksDBProvider;
8use crate::StaticFileProviderFactory;
9use alloy_eips::eip2718::Encodable2718;
10use alloy_primitives::BlockNumber;
11use rayon::prelude::*;
12use reth_db_api::tables;
13use reth_stages_types::StageId;
14use reth_static_file_types::StaticFileSegment;
15use reth_storage_api::{
16    BlockBodyIndicesProvider, ChangeSetReader, DBProvider, StageCheckpointReader,
17    StorageChangeSetReader, StorageSettingsCache, TransactionsProvider,
18};
19use reth_storage_errors::provider::ProviderResult;
20use std::collections::HashSet;
21
22/// Batch size for changeset iteration during history healing.
23/// Balances memory usage against iteration overhead.
24const HEAL_HISTORY_BATCH_SIZE: u64 = 10_000;
25
26impl RocksDBProvider {
27    /// Checks consistency of `RocksDB` tables against MDBX stage checkpoints.
28    ///
29    /// Returns an unwind target block number if the pipeline needs to unwind to rebuild
30    /// `RocksDB` data. Returns `None` if all invariants pass or if inconsistencies were healed.
31    ///
32    /// # Invariants checked
33    ///
34    /// For `TransactionHashNumbers`:
35    /// - The maximum `TxNumber` value should not exceed what the `TransactionLookup` stage
36    ///   checkpoint indicates has been processed.
37    /// - If `RocksDB` is ahead, excess entries are pruned (healed).
38    /// - If `RocksDB` is behind, an unwind is required.
39    ///
40    /// For `StoragesHistory` and `AccountsHistory`:
41    /// - Uses changesets to heal stale entries when static file tip > checkpoint.
42    ///
43    /// # Requirements
44    ///
45    /// For pruning `TransactionHashNumbers`, the provider must be able to supply transaction
46    /// data (typically from static files) so that transaction hashes can be computed. This
47    /// implies that static files should be ahead of or in sync with `RocksDB`.
48    pub fn check_consistency<Provider>(
49        &self,
50        provider: &Provider,
51    ) -> ProviderResult<Option<BlockNumber>>
52    where
53        Provider: DBProvider
54            + StageCheckpointReader
55            + StorageSettingsCache
56            + StaticFileProviderFactory
57            + BlockBodyIndicesProvider
58            + StorageChangeSetReader
59            + ChangeSetReader
60            + TransactionsProvider<Transaction: Encodable2718>,
61    {
62        let mut unwind_target: Option<BlockNumber> = None;
63
64        // Heal TransactionHashNumbers if stored in RocksDB
65        if provider.cached_storage_settings().storage_v2 &&
66            let Some(target) = self.heal_transaction_hash_numbers(provider)?
67        {
68            unwind_target = Some(unwind_target.map_or(target, |t| t.min(target)));
69        }
70
71        // Heal StoragesHistory if stored in RocksDB
72        if provider.cached_storage_settings().storage_v2 &&
73            let Some(target) = self.heal_storages_history(provider)?
74        {
75            unwind_target = Some(unwind_target.map_or(target, |t| t.min(target)));
76        }
77
78        // Heal AccountsHistory if stored in RocksDB
79        if provider.cached_storage_settings().storage_v2 &&
80            let Some(target) = self.heal_accounts_history(provider)?
81        {
82            unwind_target = Some(unwind_target.map_or(target, |t| t.min(target)));
83        }
84
85        Ok(unwind_target)
86    }
87
88    /// Heals the `TransactionHashNumbers` table.
89    ///
90    /// - Fast path: if checkpoint == 0, clear any stale data and return
91    /// - If `sf_tip` < checkpoint, return unwind target (static files behind)
92    /// - If `sf_tip` == checkpoint, nothing to do
93    /// - If `sf_tip` > checkpoint, heal via transaction ranges in batches
94    fn heal_transaction_hash_numbers<Provider>(
95        &self,
96        provider: &Provider,
97    ) -> ProviderResult<Option<BlockNumber>>
98    where
99        Provider: DBProvider
100            + StageCheckpointReader
101            + StaticFileProviderFactory
102            + BlockBodyIndicesProvider
103            + TransactionsProvider<Transaction: Encodable2718>,
104    {
105        let checkpoint = provider
106            .get_stage_checkpoint(StageId::TransactionLookup)?
107            .map(|cp| cp.block_number)
108            .unwrap_or(0);
109
110        let sf_tip = provider
111            .static_file_provider()
112            .get_highest_static_file_block(StaticFileSegment::Transactions)
113            .unwrap_or(0);
114
115        // Fast path: clear any stale data and return.
116        if checkpoint == 0 {
117            tracing::info!(
118                target: "reth::providers::rocksdb",
119                "TransactionHashNumbers: checkpoint is 0, clearing stale data"
120            );
121            self.clear::<tables::TransactionHashNumbers>()?;
122            return Ok(None);
123        }
124
125        if sf_tip < checkpoint {
126            // This should never happen in normal operation - static files are always committed
127            // before RocksDB. If we get here, something is seriously wrong. The unwind is a
128            // best-effort attempt but is probably futile.
129            tracing::warn!(
130                target: "reth::providers::rocksdb",
131                sf_tip,
132                checkpoint,
133                "TransactionHashNumbers: static file tip behind checkpoint, unwind needed"
134            );
135            return Ok(Some(sf_tip));
136        }
137
138        // sf_tip == checkpoint - nothing to do
139        if sf_tip == checkpoint {
140            return Ok(None);
141        }
142
143        // Get end tx from static files (authoritative for sf_tip)
144        let sf_tip_end_tx = provider
145            .static_file_provider()
146            .get_highest_static_file_tx(StaticFileSegment::Transactions)
147            .unwrap_or(0);
148
149        // Get the first tx after the checkpoint block from MDBX (authoritative up to checkpoint)
150        let checkpoint_next_tx = provider
151            .block_body_indices(checkpoint)?
152            .map(|indices| indices.next_tx_num())
153            .unwrap_or(0);
154
155        if sf_tip_end_tx < checkpoint_next_tx {
156            // This should never happen in normal operation - static files should have all
157            // transactions up to sf_tip. If we get here, something is seriously wrong.
158            // The unwind is a best-effort attempt but is probably futile.
159            tracing::warn!(
160                target: "reth::providers::rocksdb",
161                sf_tip_end_tx,
162                checkpoint_next_tx,
163                checkpoint,
164                sf_tip,
165                "TransactionHashNumbers: static file tx tip behind checkpoint, unwind needed"
166            );
167            return Ok(Some(sf_tip));
168        }
169
170        tracing::info!(
171            target: "reth::providers::rocksdb",
172            checkpoint,
173            sf_tip,
174            checkpoint_next_tx,
175            sf_tip_end_tx,
176            "TransactionHashNumbers: healing via transaction ranges"
177        );
178
179        const BATCH_SIZE: u64 = 10_000;
180        let mut batch_start = checkpoint_next_tx;
181
182        while batch_start <= sf_tip_end_tx {
183            let batch_end = batch_start.saturating_add(BATCH_SIZE - 1).min(sf_tip_end_tx);
184
185            tracing::debug!(
186                target: "reth::providers::rocksdb",
187                batch_start,
188                batch_end,
189                "Pruning TransactionHashNumbers batch"
190            );
191
192            self.prune_transaction_hash_numbers_in_range(provider, batch_start..=batch_end)?;
193
194            batch_start = batch_end.saturating_add(1);
195        }
196
197        Ok(None)
198    }
199
200    /// Prunes `TransactionHashNumbers` entries for transactions in the given range.
201    ///
202    /// This fetches transactions from the provider, computes their hashes in parallel,
203    /// and deletes the corresponding entries from `RocksDB` by key. This approach is more
204    /// scalable than iterating all rows because it only processes the transactions that
205    /// need to be pruned.
206    ///
207    /// # Requirements
208    ///
209    /// The provider must be able to supply transaction data (typically from static files)
210    /// so that transaction hashes can be computed. This implies that static files should
211    /// be ahead of or in sync with `RocksDB`.
212    fn prune_transaction_hash_numbers_in_range<Provider>(
213        &self,
214        provider: &Provider,
215        tx_range: std::ops::RangeInclusive<u64>,
216    ) -> ProviderResult<()>
217    where
218        Provider: TransactionsProvider<Transaction: Encodable2718>,
219    {
220        if tx_range.is_empty() {
221            return Ok(());
222        }
223
224        // Fetch transactions in the range and compute their hashes in parallel
225        let hashes: Vec<_> = provider
226            .transactions_by_tx_range(tx_range.clone())?
227            .into_par_iter()
228            .map(|tx| tx.trie_hash())
229            .collect();
230
231        if !hashes.is_empty() {
232            tracing::info!(
233                target: "reth::providers::rocksdb",
234                deleted_count = hashes.len(),
235                tx_range_start = *tx_range.start(),
236                tx_range_end = *tx_range.end(),
237                "Pruning TransactionHashNumbers entries by tx range"
238            );
239
240            let mut batch = self.batch();
241            for hash in hashes {
242                batch.delete::<tables::TransactionHashNumbers>(hash)?;
243            }
244            batch.commit()?;
245        }
246
247        Ok(())
248    }
249
250    /// Heals the `StoragesHistory` table by removing stale entries.
251    ///
252    /// Returns an unwind target if static file tip is behind checkpoint (cannot heal).
253    /// Otherwise iterates changesets in batches to identify and unwind affected keys.
254    fn heal_storages_history<Provider>(
255        &self,
256        provider: &Provider,
257    ) -> ProviderResult<Option<BlockNumber>>
258    where
259        Provider:
260            DBProvider + StageCheckpointReader + StaticFileProviderFactory + StorageChangeSetReader,
261    {
262        let checkpoint = provider
263            .get_stage_checkpoint(StageId::IndexStorageHistory)?
264            .map(|cp| cp.block_number)
265            .unwrap_or(0);
266
267        // Fast path: clear any stale data and return.
268        if checkpoint == 0 {
269            tracing::info!(
270                target: "reth::providers::rocksdb",
271                "StoragesHistory: checkpoint is 0, clearing stale data"
272            );
273            self.clear::<tables::StoragesHistory>()?;
274            return Ok(None);
275        }
276
277        let sf_tip = provider
278            .static_file_provider()
279            .get_highest_static_file_block(StaticFileSegment::StorageChangeSets)
280            .unwrap_or(0);
281
282        if sf_tip < checkpoint {
283            // This should never happen in normal operation - static files are always
284            // committed before RocksDB. If we get here, something is seriously wrong.
285            // The unwind is a best-effort attempt but is probably futile.
286            tracing::warn!(
287                target: "reth::providers::rocksdb",
288                sf_tip,
289                checkpoint,
290                "StoragesHistory: static file tip behind checkpoint, unwind needed"
291            );
292            return Ok(Some(sf_tip));
293        }
294
295        if sf_tip == checkpoint {
296            return Ok(None);
297        }
298
299        let total_blocks = sf_tip - checkpoint;
300        tracing::info!(
301            target: "reth::providers::rocksdb",
302            checkpoint,
303            sf_tip,
304            total_blocks,
305            "StoragesHistory: healing via changesets"
306        );
307
308        let mut batch_start = checkpoint + 1;
309        let mut batch_num = 0u64;
310        let total_batches = total_blocks.div_ceil(HEAL_HISTORY_BATCH_SIZE);
311
312        while batch_start <= sf_tip {
313            let batch_end = (batch_start + HEAL_HISTORY_BATCH_SIZE - 1).min(sf_tip);
314            batch_num += 1;
315
316            let changesets = provider.storage_changesets_range(batch_start..=batch_end)?;
317
318            let unique_keys: HashSet<_> = changesets
319                .into_iter()
320                .map(|(block_addr, entry)| (block_addr.address(), entry.key, checkpoint + 1))
321                .collect();
322            let indices: Vec<_> = unique_keys.into_iter().collect();
323
324            if !indices.is_empty() {
325                tracing::info!(
326                    target: "reth::providers::rocksdb",
327                    batch_num,
328                    total_batches,
329                    batch_start,
330                    batch_end,
331                    indices_count = indices.len(),
332                    "StoragesHistory: unwinding batch"
333                );
334
335                let batch = self.unwind_storage_history_indices(&indices)?;
336                self.commit_batch(batch)?;
337            }
338
339            batch_start = batch_end + 1;
340        }
341
342        Ok(None)
343    }
344
345    /// Heals the `AccountsHistory` table by removing stale entries.
346    ///
347    /// Returns an unwind target if static file tip is behind checkpoint (cannot heal).
348    /// Otherwise iterates changesets in batches to identify and unwind affected keys.
349    fn heal_accounts_history<Provider>(
350        &self,
351        provider: &Provider,
352    ) -> ProviderResult<Option<BlockNumber>>
353    where
354        Provider: DBProvider + StageCheckpointReader + StaticFileProviderFactory + ChangeSetReader,
355    {
356        let checkpoint = provider
357            .get_stage_checkpoint(StageId::IndexAccountHistory)?
358            .map(|cp| cp.block_number)
359            .unwrap_or(0);
360
361        // Fast path: clear any stale data and return.
362        if checkpoint == 0 {
363            tracing::info!(
364                target: "reth::providers::rocksdb",
365                "AccountsHistory: checkpoint is 0, clearing stale data"
366            );
367            self.clear::<tables::AccountsHistory>()?;
368            return Ok(None);
369        }
370
371        let sf_tip = provider
372            .static_file_provider()
373            .get_highest_static_file_block(StaticFileSegment::AccountChangeSets)
374            .unwrap_or(0);
375
376        if sf_tip < checkpoint {
377            // This should never happen in normal operation - static files are always
378            // committed before RocksDB. If we get here, something is seriously wrong.
379            // The unwind is a best-effort attempt but is probably futile.
380            tracing::warn!(
381                target: "reth::providers::rocksdb",
382                sf_tip,
383                checkpoint,
384                "AccountsHistory: static file tip behind checkpoint, unwind needed"
385            );
386            return Ok(Some(sf_tip));
387        }
388
389        if sf_tip == checkpoint {
390            return Ok(None);
391        }
392
393        let total_blocks = sf_tip - checkpoint;
394        tracing::info!(
395            target: "reth::providers::rocksdb",
396            checkpoint,
397            sf_tip,
398            total_blocks,
399            "AccountsHistory: healing via changesets"
400        );
401
402        let mut batch_start = checkpoint + 1;
403        let mut batch_num = 0u64;
404        let total_batches = total_blocks.div_ceil(HEAL_HISTORY_BATCH_SIZE);
405
406        while batch_start <= sf_tip {
407            let batch_end = (batch_start + HEAL_HISTORY_BATCH_SIZE - 1).min(sf_tip);
408            batch_num += 1;
409
410            let changesets = provider.account_changesets_range(batch_start..=batch_end)?;
411
412            let mut addresses = HashSet::with_capacity(changesets.len());
413            addresses.extend(changesets.iter().map(|(_, cs)| cs.address));
414            let unwind_from = checkpoint + 1;
415            let indices: Vec<_> = addresses.into_iter().map(|addr| (addr, unwind_from)).collect();
416
417            if !indices.is_empty() {
418                tracing::info!(
419                    target: "reth::providers::rocksdb",
420                    batch_num,
421                    total_batches,
422                    batch_start,
423                    batch_end,
424                    indices_count = indices.len(),
425                    "AccountsHistory: unwinding batch"
426                );
427
428                let batch = self.unwind_account_history_indices(&indices)?;
429                self.commit_batch(batch)?;
430            }
431
432            batch_start = batch_end + 1;
433        }
434
435        Ok(None)
436    }
437}
438
439#[cfg(test)]
440mod tests {
441    use super::*;
442    use crate::{
443        providers::{rocksdb::RocksDBBuilder, static_file::StaticFileWriter},
444        test_utils::create_test_provider_factory,
445        BlockWriter, DatabaseProviderFactory, StageCheckpointWriter, TransactionsProvider,
446    };
447    use alloy_primitives::{Address, B256};
448    use reth_db::cursor::{DbCursorRO, DbCursorRW};
449    use reth_db_api::{
450        models::{storage_sharded_key::StorageShardedKey, StorageSettings},
451        tables::{self, BlockNumberList},
452        transaction::DbTxMut,
453    };
454    use reth_stages_types::StageCheckpoint;
455    use reth_testing_utils::generators::{self, BlockRangeParams};
456    use tempfile::TempDir;
457
458    #[test]
459    fn test_first_last_empty_rocksdb() {
460        let temp_dir = TempDir::new().unwrap();
461        let provider = RocksDBBuilder::new(temp_dir.path())
462            .with_table::<tables::TransactionHashNumbers>()
463            .with_table::<tables::StoragesHistory>()
464            .build()
465            .unwrap();
466
467        // Empty RocksDB, no checkpoints - should be consistent
468        let first = provider.first::<tables::TransactionHashNumbers>().unwrap();
469        let last = provider.last::<tables::TransactionHashNumbers>().unwrap();
470
471        assert!(first.is_none());
472        assert!(last.is_none());
473    }
474
475    #[test]
476    fn test_first_last_with_data() {
477        let temp_dir = TempDir::new().unwrap();
478        let provider = RocksDBBuilder::new(temp_dir.path())
479            .with_table::<tables::TransactionHashNumbers>()
480            .build()
481            .unwrap();
482
483        // Insert some data
484        let tx_hash = B256::from([1u8; 32]);
485        provider.put::<tables::TransactionHashNumbers>(tx_hash, &100).unwrap();
486
487        // RocksDB has data
488        let last = provider.last::<tables::TransactionHashNumbers>().unwrap();
489        assert!(last.is_some());
490        assert_eq!(last.unwrap().1, 100);
491    }
492
493    #[test]
494    fn test_check_consistency_empty_rocksdb_no_checkpoint_is_ok() {
495        let temp_dir = TempDir::new().unwrap();
496        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
497
498        // Create a test provider factory for MDBX
499        let factory = create_test_provider_factory();
500        factory.set_storage_settings_cache(StorageSettings::v2());
501
502        let provider = factory.database_provider_ro().unwrap();
503
504        // Empty RocksDB and no checkpoints - should be consistent (None = no unwind needed)
505        let result = rocksdb.check_consistency(&provider).unwrap();
506        assert_eq!(result, None);
507    }
508
509    /// Tests that `checkpoint=0` with empty `RocksDB` returns early without attempting
510    /// an expensive healing loop. Previously, when `sf_tip` > `checkpoint=0`, the healer
511    /// would iterate billions of transactions from static files for no effect, causing
512    /// the node to hang on startup with MDBX read transaction timeouts.
513    #[test]
514    fn test_check_consistency_checkpoint_zero_empty_rocksdb_returns_early() {
515        let temp_dir = TempDir::new().unwrap();
516        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
517
518        let factory = create_test_provider_factory();
519        factory.set_storage_settings_cache(StorageSettings::v2());
520
521        // No checkpoints set — all default to 0 via unwrap_or(0).
522        // RocksDB tables are empty.
523        let provider = factory.database_provider_ro().unwrap();
524
525        let result = rocksdb.heal_transaction_hash_numbers(&provider).unwrap();
526        assert_eq!(result, None, "TransactionHashNumbers should return early at checkpoint 0");
527        assert!(rocksdb.first::<tables::TransactionHashNumbers>().unwrap().is_none());
528
529        let result = rocksdb.heal_storages_history(&provider).unwrap();
530        assert_eq!(result, None, "StoragesHistory should return early at checkpoint 0");
531        assert!(rocksdb.first::<tables::StoragesHistory>().unwrap().is_none());
532
533        let result = rocksdb.heal_accounts_history(&provider).unwrap();
534        assert_eq!(result, None, "AccountsHistory should return early at checkpoint 0");
535        assert!(rocksdb.first::<tables::AccountsHistory>().unwrap().is_none());
536    }
537
538    #[test]
539    fn test_check_consistency_empty_rocksdb_with_checkpoint_is_first_run() {
540        let temp_dir = TempDir::new().unwrap();
541        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
542
543        // Create a test provider factory for MDBX
544        let factory = create_test_provider_factory();
545        factory.set_storage_settings_cache(StorageSettings::v2());
546
547        // Set a checkpoint indicating we should have processed up to block 100
548        {
549            let provider = factory.database_provider_rw().unwrap();
550            provider
551                .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(100))
552                .unwrap();
553            provider.commit().unwrap();
554        }
555
556        let provider = factory.database_provider_ro().unwrap();
557
558        // RocksDB is empty but checkpoint says block 100 was processed.
559        // Since static file tip defaults to 0 when None, and 0 < 100, an unwind is triggered.
560        let result = rocksdb.check_consistency(&provider).unwrap();
561        assert_eq!(result, Some(0), "Static file tip (0) behind checkpoint (100) triggers unwind");
562    }
563
564    /// Tests that when checkpoint=0 and `RocksDB` has data, all entries are pruned.
565    /// This simulates a crash recovery scenario where the checkpoint was lost.
566    #[test]
567    fn test_check_consistency_checkpoint_zero_with_rocksdb_data_prunes_all() {
568        let temp_dir = TempDir::new().unwrap();
569        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
570
571        let factory = create_test_provider_factory();
572        factory.set_storage_settings_cache(StorageSettings::v2());
573
574        // Generate blocks with real transactions and insert them
575        let mut rng = generators::rng();
576        let blocks = generators::random_block_range(
577            &mut rng,
578            0..=2,
579            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
580        );
581
582        let mut tx_hashes = Vec::new();
583        {
584            let provider = factory.database_provider_rw().unwrap();
585            let mut tx_count = 0u64;
586            for block in &blocks {
587                provider
588                    .insert_block(&block.clone().try_recover().expect("recover block"))
589                    .unwrap();
590                for tx in &block.body().transactions {
591                    let hash = tx.trie_hash();
592                    tx_hashes.push(hash);
593                    rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
594                    tx_count += 1;
595                }
596            }
597            provider.commit().unwrap();
598        }
599
600        // Explicitly clear the checkpoints to simulate crash recovery
601        {
602            let provider = factory.database_provider_rw().unwrap();
603            provider
604                .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(0))
605                .unwrap();
606            provider
607                .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(0))
608                .unwrap();
609            provider
610                .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(0))
611                .unwrap();
612            provider.commit().unwrap();
613        }
614
615        // Verify RocksDB data exists
616        assert!(rocksdb.last::<tables::TransactionHashNumbers>().unwrap().is_some());
617
618        let provider = factory.database_provider_ro().unwrap();
619
620        // checkpoint = 0 but RocksDB has data.
621        // This means RocksDB has stale data that should be cleared.
622        let result = rocksdb.check_consistency(&provider).unwrap();
623        assert_eq!(result, None, "Should heal by clearing, no unwind needed");
624
625        // Verify data was cleared
626        for hash in &tx_hashes {
627            assert!(
628                rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap().is_none(),
629                "RocksDB should be empty after pruning"
630            );
631        }
632    }
633
634    #[test]
635    fn test_check_consistency_storages_history_empty_with_checkpoint_is_first_run() {
636        let temp_dir = TempDir::new().unwrap();
637        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
638
639        // Create a test provider factory for MDBX
640        let factory = create_test_provider_factory();
641        factory.set_storage_settings_cache(StorageSettings::v2());
642
643        // Set a checkpoint indicating we should have processed up to block 100
644        {
645            let provider = factory.database_provider_rw().unwrap();
646            provider
647                .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
648                .unwrap();
649            provider.commit().unwrap();
650        }
651
652        let provider = factory.database_provider_ro().unwrap();
653
654        // RocksDB is empty but checkpoint says block 100 was processed.
655        // Since sf_tip=0 < checkpoint=100, we return unwind target of 0.
656        // This should never happen in normal operation.
657        let result = rocksdb.check_consistency(&provider).unwrap();
658        assert_eq!(result, Some(0), "sf_tip=0 < checkpoint=100 returns unwind target");
659    }
660
661    #[test]
662    fn test_check_consistency_storages_history_has_data_no_checkpoint_prunes_data() {
663        let temp_dir = TempDir::new().unwrap();
664        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
665
666        // Insert data into RocksDB
667        let key = StorageShardedKey::new(Address::ZERO, B256::ZERO, 50);
668        let block_list = BlockNumberList::new_pre_sorted([10, 20, 30, 50]);
669        rocksdb.put::<tables::StoragesHistory>(key, &block_list).unwrap();
670
671        // Verify data exists
672        assert!(rocksdb.last::<tables::StoragesHistory>().unwrap().is_some());
673
674        // Create a test provider factory for MDBX with NO checkpoint
675        let factory = create_test_provider_factory();
676        factory.set_storage_settings_cache(StorageSettings::v2());
677
678        let provider = factory.database_provider_ro().unwrap();
679
680        // RocksDB has data but checkpoint is 0
681        // This means RocksDB has stale data that should be pruned (healed)
682        let result = rocksdb.check_consistency(&provider).unwrap();
683        assert_eq!(result, None, "Should heal by pruning, no unwind needed");
684
685        // Verify data was pruned
686        assert!(
687            rocksdb.last::<tables::StoragesHistory>().unwrap().is_none(),
688            "RocksDB should be empty after pruning"
689        );
690    }
691    #[test]
692    fn test_check_consistency_mdbx_behind_checkpoint_needs_unwind() {
693        let temp_dir = TempDir::new().unwrap();
694        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
695
696        let factory = create_test_provider_factory();
697        factory.set_storage_settings_cache(StorageSettings::v2());
698
699        // Generate blocks with real transactions (blocks 0-2, 6 transactions total)
700        let mut rng = generators::rng();
701        let blocks = generators::random_block_range(
702            &mut rng,
703            0..=2,
704            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
705        );
706
707        {
708            let provider = factory.database_provider_rw().unwrap();
709            let mut tx_count = 0u64;
710            for block in &blocks {
711                provider
712                    .insert_block(&block.clone().try_recover().expect("recover block"))
713                    .unwrap();
714                for tx in &block.body().transactions {
715                    let hash = tx.trie_hash();
716                    rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
717                    tx_count += 1;
718                }
719            }
720            provider.commit().unwrap();
721        }
722
723        // Set checkpoint to block 10 (beyond our actual data at block 2)
724        // sf_tip is at block 2, checkpoint is at block 10
725        // Since sf_tip < checkpoint, we need to unwind to sf_tip
726        {
727            let provider = factory.database_provider_rw().unwrap();
728            provider
729                .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(10))
730                .unwrap();
731            // Reset history checkpoints so they don't interfere
732            provider
733                .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(0))
734                .unwrap();
735            provider
736                .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(0))
737                .unwrap();
738            provider.commit().unwrap();
739        }
740
741        let provider = factory.database_provider_ro().unwrap();
742
743        // sf_tip (2) < checkpoint (10), so unwind to sf_tip is needed
744        let result = rocksdb.check_consistency(&provider).unwrap();
745        assert_eq!(result, Some(2), "sf_tip < checkpoint requires unwind to sf_tip");
746    }
747
748    #[test]
749    fn test_check_consistency_rocksdb_ahead_of_checkpoint_prunes_excess() {
750        let temp_dir = TempDir::new().unwrap();
751        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
752
753        // Create a test provider factory for MDBX
754        let factory = create_test_provider_factory();
755        factory.set_storage_settings_cache(StorageSettings::v2());
756
757        // Generate blocks with real transactions:
758        // Blocks 0-5, each with 2 transactions = 12 total transactions (0-11)
759        let mut rng = generators::rng();
760        let blocks = generators::random_block_range(
761            &mut rng,
762            0..=5,
763            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
764        );
765
766        // Track which hashes belong to which blocks
767        let mut tx_hashes = Vec::new();
768        let mut tx_count = 0u64;
769        {
770            let provider = factory.database_provider_rw().unwrap();
771            // Insert ALL blocks (0-5) to write transactions to static files
772            for block in &blocks {
773                provider
774                    .insert_block(&block.clone().try_recover().expect("recover block"))
775                    .unwrap();
776                for tx in &block.body().transactions {
777                    let hash = tx.trie_hash();
778                    tx_hashes.push(hash);
779                    rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
780                    tx_count += 1;
781                }
782            }
783            provider.commit().unwrap();
784        }
785
786        // Simulate crash recovery scenario:
787        // MDBX was unwound to block 2, but RocksDB and static files still have more data.
788        // Remove TransactionBlocks entries for blocks 3-5 to simulate MDBX unwind.
789        {
790            let provider = factory.database_provider_rw().unwrap();
791            // Delete TransactionBlocks entries for tx > 5 (i.e., for blocks 3-5)
792            // TransactionBlocks maps last_tx_in_block -> block_number
793            // After unwind, only entries for blocks 0-2 should remain (tx 5 -> block 2)
794            let mut cursor = provider.tx_ref().cursor_write::<tables::TransactionBlocks>().unwrap();
795            // Walk and delete entries where block > 2
796            let mut to_delete = Vec::new();
797            let mut walker = cursor.walk(Some(0)).unwrap();
798            while let Some((tx_num, block_num)) = walker.next().transpose().unwrap() {
799                if block_num > 2 {
800                    to_delete.push(tx_num);
801                }
802            }
803            drop(walker);
804            for tx_num in to_delete {
805                cursor.seek_exact(tx_num).unwrap();
806                cursor.delete_current().unwrap();
807            }
808
809            // Set checkpoint to block 2
810            provider
811                .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(2))
812                .unwrap();
813            // Reset history checkpoints so they don't interfere
814            provider
815                .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(0))
816                .unwrap();
817            provider
818                .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(0))
819                .unwrap();
820            provider.commit().unwrap();
821        }
822
823        let provider = factory.database_provider_ro().unwrap();
824
825        // RocksDB has tx hashes for all blocks (0-5)
826        // MDBX TransactionBlocks only goes up to tx 5 (block 2)
827        // Static files have data for all txs (0-11)
828        // This means RocksDB is ahead and should prune entries for tx 6-11
829        let result = rocksdb.check_consistency(&provider).unwrap();
830        assert_eq!(result, None, "Should heal by pruning, no unwind needed");
831
832        // Verify: hashes for blocks 0-2 (tx 0-5) should remain, blocks 3-5 (tx 6-11) should be
833        // pruned First 6 hashes should remain
834        for (i, hash) in tx_hashes.iter().take(6).enumerate() {
835            assert!(
836                rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap().is_some(),
837                "tx {} should remain",
838                i
839            );
840        }
841        // Last 6 hashes should be pruned
842        for (i, hash) in tx_hashes.iter().skip(6).enumerate() {
843            assert!(
844                rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap().is_none(),
845                "tx {} should be pruned",
846                i + 6
847            );
848        }
849    }
850
851    #[test]
852    fn test_check_consistency_storages_history_sentinel_only_with_checkpoint_is_first_run() {
853        let temp_dir = TempDir::new().unwrap();
854        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
855
856        // Insert ONLY sentinel entries (highest_block_number = u64::MAX)
857        // This simulates a scenario where history tracking started but no shards were completed
858        let key_sentinel_1 = StorageShardedKey::new(Address::ZERO, B256::ZERO, u64::MAX);
859        let key_sentinel_2 = StorageShardedKey::new(Address::random(), B256::random(), u64::MAX);
860        let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
861        rocksdb.put::<tables::StoragesHistory>(key_sentinel_1, &block_list).unwrap();
862        rocksdb.put::<tables::StoragesHistory>(key_sentinel_2, &block_list).unwrap();
863
864        // Verify entries exist (not empty table)
865        assert!(rocksdb.first::<tables::StoragesHistory>().unwrap().is_some());
866
867        // Create a test provider factory for MDBX
868        let factory = create_test_provider_factory();
869        factory.set_storage_settings_cache(StorageSettings::v2());
870
871        // Set a checkpoint indicating we should have processed up to block 100
872        {
873            let provider = factory.database_provider_rw().unwrap();
874            provider
875                .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
876                .unwrap();
877            provider.commit().unwrap();
878        }
879
880        let provider = factory.database_provider_ro().unwrap();
881
882        // RocksDB has only sentinel entries but checkpoint is set.
883        // Since sf_tip=0 < checkpoint=100, we return unwind target of 0.
884        // This should never happen in normal operation.
885        let result = rocksdb.check_consistency(&provider).unwrap();
886        assert_eq!(result, Some(0), "sf_tip=0 < checkpoint=100 returns unwind target");
887    }
888
889    #[test]
890    fn test_check_consistency_accounts_history_sentinel_only_with_checkpoint_is_first_run() {
891        use reth_db_api::models::ShardedKey;
892
893        let temp_dir = TempDir::new().unwrap();
894        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
895
896        // Insert ONLY sentinel entries (highest_block_number = u64::MAX)
897        let key_sentinel_1 = ShardedKey::new(Address::ZERO, u64::MAX);
898        let key_sentinel_2 = ShardedKey::new(Address::random(), u64::MAX);
899        let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
900        rocksdb.put::<tables::AccountsHistory>(key_sentinel_1, &block_list).unwrap();
901        rocksdb.put::<tables::AccountsHistory>(key_sentinel_2, &block_list).unwrap();
902
903        // Verify entries exist (not empty table)
904        assert!(rocksdb.first::<tables::AccountsHistory>().unwrap().is_some());
905
906        // Create a test provider factory for MDBX
907        let factory = create_test_provider_factory();
908        factory.set_storage_settings_cache(StorageSettings::v2());
909
910        // Set a checkpoint indicating we should have processed up to block 100
911        {
912            let provider = factory.database_provider_rw().unwrap();
913            provider
914                .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
915                .unwrap();
916            provider.commit().unwrap();
917        }
918
919        let provider = factory.database_provider_ro().unwrap();
920
921        // RocksDB has only sentinel entries but checkpoint is set.
922        // Since sf_tip=0 < checkpoint=100, we return unwind target of 0.
923        // This should never happen in normal operation.
924        let result = rocksdb.check_consistency(&provider).unwrap();
925        assert_eq!(result, Some(0), "sf_tip=0 < checkpoint=100 returns unwind target");
926    }
927
928    /// Test that pruning works by fetching transactions and computing their hashes,
929    /// rather than iterating all rows. This test uses random blocks with unique
930    /// transactions so we can verify the correct entries are pruned.
931    #[test]
932    fn test_prune_transaction_hash_numbers_by_range() {
933        let temp_dir = TempDir::new().unwrap();
934        let rocksdb = RocksDBBuilder::new(temp_dir.path())
935            .with_table::<tables::TransactionHashNumbers>()
936            .build()
937            .unwrap();
938
939        // Create a test provider factory for MDBX
940        let factory = create_test_provider_factory();
941        factory.set_storage_settings_cache(StorageSettings::v2());
942
943        // Generate random blocks with unique transactions
944        // Block 0 (genesis) has no transactions
945        // Blocks 1-5 each have 2 transactions = 10 transactions total
946        let mut rng = generators::rng();
947        let blocks = generators::random_block_range(
948            &mut rng,
949            0..=5,
950            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
951        );
952
953        // Insert blocks into the database
954        let mut tx_count = 0u64;
955        let mut tx_hashes = Vec::new();
956        {
957            let provider = factory.database_provider_rw().unwrap();
958
959            for block in &blocks {
960                provider
961                    .insert_block(&block.clone().try_recover().expect("recover block"))
962                    .unwrap();
963
964                // Store transaction hash -> tx_number mappings in RocksDB
965                for tx in &block.body().transactions {
966                    let hash = tx.trie_hash();
967                    tx_hashes.push(hash);
968                    rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
969                    tx_count += 1;
970                }
971            }
972
973            // Set checkpoint to block 2 (meaning we should only have tx hashes for blocks 0-2)
974            // Blocks 0, 1, 2 have 6 transactions (2 each), so tx 0-5 should remain
975            provider
976                .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(2))
977                .unwrap();
978            provider.commit().unwrap();
979        }
980
981        // At this point:
982        // - RocksDB has tx hashes for blocks 0-5 (10 total: 2 per block)
983        // - Checkpoint says we only processed up to block 2
984        // - We need to prune tx hashes for blocks 3, 4, 5 (tx 6-9)
985
986        // Verify RocksDB has the expected number of entries before pruning
987        let rocksdb_count_before: usize =
988            rocksdb.iter::<tables::TransactionHashNumbers>().unwrap().count();
989        assert_eq!(
990            rocksdb_count_before, tx_count as usize,
991            "RocksDB should have all {} transaction hashes before pruning",
992            tx_count
993        );
994
995        let provider = factory.database_provider_ro().unwrap();
996
997        // Verify we can fetch transactions by tx range
998        let all_txs = provider.transactions_by_tx_range(0..tx_count).unwrap();
999        assert_eq!(all_txs.len(), tx_count as usize, "Should be able to fetch all transactions");
1000
1001        // Verify the hashes match between what we stored and what we compute from fetched txs
1002        for (i, tx) in all_txs.iter().enumerate() {
1003            let computed_hash = tx.trie_hash();
1004            assert_eq!(
1005                computed_hash, tx_hashes[i],
1006                "Hash mismatch for tx {}: stored {:?} vs computed {:?}",
1007                i, tx_hashes[i], computed_hash
1008            );
1009        }
1010
1011        // Blocks 0, 1, 2 have 2 tx each = 6 tx total (indices 0-5)
1012        // We want to keep tx 0-5, prune tx 6-9
1013        let max_tx_to_keep = 5u64;
1014        let tx_to_prune_start = max_tx_to_keep + 1;
1015
1016        // Prune transactions 6-9 (blocks 3-5)
1017        rocksdb
1018            .prune_transaction_hash_numbers_in_range(&provider, tx_to_prune_start..=(tx_count - 1))
1019            .expect("prune should succeed");
1020
1021        // Verify: transactions 0-5 should remain, 6-9 should be pruned
1022        let mut remaining_count = 0;
1023        for result in rocksdb.iter::<tables::TransactionHashNumbers>().unwrap() {
1024            let (_hash, tx_num) = result.unwrap();
1025            assert!(
1026                tx_num <= max_tx_to_keep,
1027                "Transaction {} should have been pruned (> {})",
1028                tx_num,
1029                max_tx_to_keep
1030            );
1031            remaining_count += 1;
1032        }
1033        assert_eq!(
1034            remaining_count,
1035            (max_tx_to_keep + 1) as usize,
1036            "Should have {} transactions (0-{})",
1037            max_tx_to_keep + 1,
1038            max_tx_to_keep
1039        );
1040    }
1041
1042    #[test]
1043    fn test_check_consistency_accounts_history_empty_with_checkpoint_is_first_run() {
1044        let temp_dir = TempDir::new().unwrap();
1045        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1046
1047        // Create a test provider factory for MDBX
1048        let factory = create_test_provider_factory();
1049        factory.set_storage_settings_cache(StorageSettings::v2());
1050
1051        // Set a checkpoint indicating we should have processed up to block 100
1052        {
1053            let provider = factory.database_provider_rw().unwrap();
1054            provider
1055                .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
1056                .unwrap();
1057            provider.commit().unwrap();
1058        }
1059
1060        let provider = factory.database_provider_ro().unwrap();
1061
1062        // RocksDB is empty but checkpoint says block 100 was processed.
1063        // Since sf_tip=0 < checkpoint=100, we return unwind target of 0.
1064        // This should never happen in normal operation.
1065        let result = rocksdb.check_consistency(&provider).unwrap();
1066        assert_eq!(result, Some(0), "sf_tip=0 < checkpoint=100 returns unwind target");
1067    }
1068
1069    #[test]
1070    fn test_check_consistency_accounts_history_has_data_no_checkpoint_prunes_data() {
1071        use reth_db_api::models::ShardedKey;
1072
1073        let temp_dir = TempDir::new().unwrap();
1074        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1075
1076        // Insert data into RocksDB
1077        let key = ShardedKey::new(Address::ZERO, 50);
1078        let block_list = BlockNumberList::new_pre_sorted([10, 20, 30, 50]);
1079        rocksdb.put::<tables::AccountsHistory>(key, &block_list).unwrap();
1080
1081        // Verify data exists
1082        assert!(rocksdb.last::<tables::AccountsHistory>().unwrap().is_some());
1083
1084        // Create a test provider factory for MDBX with NO checkpoint
1085        let factory = create_test_provider_factory();
1086        factory.set_storage_settings_cache(StorageSettings::v2());
1087
1088        let provider = factory.database_provider_ro().unwrap();
1089
1090        // RocksDB has data but checkpoint is 0
1091        // This means RocksDB has stale data that should be pruned (healed)
1092        let result = rocksdb.check_consistency(&provider).unwrap();
1093        assert_eq!(result, None, "Should heal by pruning, no unwind needed");
1094
1095        // Verify data was pruned
1096        assert!(
1097            rocksdb.last::<tables::AccountsHistory>().unwrap().is_none(),
1098            "RocksDB should be empty after pruning"
1099        );
1100    }
1101
1102    #[test]
1103    fn test_check_consistency_accounts_history_sf_tip_equals_checkpoint_no_action() {
1104        use reth_db::models::AccountBeforeTx;
1105        use reth_db_api::models::ShardedKey;
1106        use reth_static_file_types::StaticFileSegment;
1107
1108        let temp_dir = TempDir::new().unwrap();
1109        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1110
1111        // Insert some AccountsHistory entries with various highest_block_numbers
1112        let key1 = ShardedKey::new(Address::ZERO, 50);
1113        let key2 = ShardedKey::new(Address::random(), 75);
1114        let key3 = ShardedKey::new(Address::random(), u64::MAX); // sentinel
1115        let block_list1 = BlockNumberList::new_pre_sorted([10, 20, 30, 50]);
1116        let block_list2 = BlockNumberList::new_pre_sorted([40, 60, 75]);
1117        let block_list3 = BlockNumberList::new_pre_sorted([80, 90, 100]);
1118        rocksdb.put::<tables::AccountsHistory>(key1, &block_list1).unwrap();
1119        rocksdb.put::<tables::AccountsHistory>(key2, &block_list2).unwrap();
1120        rocksdb.put::<tables::AccountsHistory>(key3, &block_list3).unwrap();
1121
1122        // Capture RocksDB state before consistency check
1123        let entries_before: Vec<_> =
1124            rocksdb.iter::<tables::AccountsHistory>().unwrap().map(|r| r.unwrap()).collect();
1125        assert_eq!(entries_before.len(), 3, "Should have 3 entries before check");
1126
1127        // Create a test provider factory for MDBX
1128        let factory = create_test_provider_factory();
1129        factory.set_storage_settings_cache(StorageSettings::v2());
1130
1131        // Write account changesets to static files for blocks 0-100
1132        {
1133            let sf_provider = factory.static_file_provider();
1134            let mut writer =
1135                sf_provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
1136
1137            for block_num in 0..=100 {
1138                let changeset = vec![AccountBeforeTx { address: Address::random(), info: None }];
1139                writer.append_account_changeset(changeset, block_num).unwrap();
1140            }
1141
1142            writer.commit().unwrap();
1143        }
1144
1145        // Set IndexAccountHistory checkpoint to block 100 (same as sf_tip)
1146        {
1147            let provider = factory.database_provider_rw().unwrap();
1148            provider
1149                .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
1150                .unwrap();
1151            provider.commit().unwrap();
1152        }
1153
1154        let provider = factory.database_provider_ro().unwrap();
1155
1156        // Verify sf_tip equals checkpoint (both at 100)
1157        let sf_tip = provider
1158            .static_file_provider()
1159            .get_highest_static_file_block(StaticFileSegment::AccountChangeSets)
1160            .unwrap();
1161        assert_eq!(sf_tip, 100, "Static file tip should be 100");
1162
1163        // Run check_consistency - should return None (no unwind needed)
1164        let result = rocksdb.check_consistency(&provider).unwrap();
1165        assert_eq!(result, None, "sf_tip == checkpoint should not require unwind");
1166
1167        // Verify NO entries are deleted - RocksDB state unchanged
1168        let entries_after: Vec<_> =
1169            rocksdb.iter::<tables::AccountsHistory>().unwrap().map(|r| r.unwrap()).collect();
1170
1171        assert_eq!(
1172            entries_after.len(),
1173            entries_before.len(),
1174            "RocksDB entry count should be unchanged when sf_tip == checkpoint"
1175        );
1176
1177        // Verify exact entries are preserved
1178        for (before, after) in entries_before.iter().zip(entries_after.iter()) {
1179            assert_eq!(before.0.key, after.0.key, "Entry key should be unchanged");
1180            assert_eq!(
1181                before.0.highest_block_number, after.0.highest_block_number,
1182                "Entry highest_block_number should be unchanged"
1183            );
1184            assert_eq!(before.1, after.1, "Entry block list should be unchanged");
1185        }
1186    }
1187
1188    /// Tests `StoragesHistory` changeset-based healing with enough blocks to trigger batching.
1189    ///
1190    /// Scenario:
1191    /// 1. Generate 15,000 blocks worth of storage changeset data (to exceed the 10k batch size)
1192    /// 2. Each block has 1 storage change (address + slot + value)
1193    /// 3. Write storage changesets to static files for all 15k blocks
1194    /// 4. Set `IndexStorageHistory` checkpoint to block 5000
1195    /// 5. Insert stale `StoragesHistory` entries in `RocksDB` for (address, slot) pairs that
1196    ///    changed in blocks 5001-15000
1197    /// 6. Run `check_consistency`
1198    /// 7. Verify stale entries for blocks > 5000 are pruned and batching worked
1199    #[test]
1200    fn test_check_consistency_storages_history_heals_via_changesets_large_range() {
1201        use alloy_primitives::U256;
1202        use reth_db_api::models::StorageBeforeTx;
1203
1204        const TOTAL_BLOCKS: u64 = 15_000;
1205        const CHECKPOINT_BLOCK: u64 = 5_000;
1206
1207        let temp_dir = TempDir::new().unwrap();
1208        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1209
1210        let factory = create_test_provider_factory();
1211        factory.set_storage_settings_cache(StorageSettings::v2());
1212
1213        // Helper to generate address from block number (reuses stack arrays)
1214        #[inline]
1215        fn make_address(block_num: u64) -> Address {
1216            let mut addr_bytes = [0u8; 20];
1217            addr_bytes[0..8].copy_from_slice(&block_num.to_le_bytes());
1218            Address::from(addr_bytes)
1219        }
1220
1221        // Helper to generate slot from block number (reuses stack arrays)
1222        #[inline]
1223        fn make_slot(block_num: u64) -> B256 {
1224            let mut slot_bytes = [0u8; 32];
1225            slot_bytes[0..8].copy_from_slice(&block_num.to_le_bytes());
1226            B256::from(slot_bytes)
1227        }
1228
1229        // Write storage changesets to static files for 15k blocks.
1230        // Each block has 1 storage change with a unique (address, slot) pair.
1231        {
1232            let sf_provider = factory.static_file_provider();
1233            let mut writer =
1234                sf_provider.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
1235
1236            // Reuse changeset vec to avoid repeated allocations
1237            let mut changeset = Vec::with_capacity(1);
1238
1239            for block_num in 0..TOTAL_BLOCKS {
1240                changeset.clear();
1241                changeset.push(StorageBeforeTx {
1242                    address: make_address(block_num),
1243                    key: make_slot(block_num),
1244                    value: U256::from(block_num),
1245                });
1246
1247                writer.append_storage_changeset(changeset.clone(), block_num).unwrap();
1248            }
1249
1250            writer.commit().unwrap();
1251        }
1252
1253        // Verify static files have data up to block 14999
1254        {
1255            let sf_provider = factory.static_file_provider();
1256            let highest = sf_provider
1257                .get_highest_static_file_block(StaticFileSegment::StorageChangeSets)
1258                .unwrap();
1259            assert_eq!(highest, TOTAL_BLOCKS - 1, "Static files should have blocks 0..14999");
1260        }
1261
1262        // Set IndexStorageHistory checkpoint to block 5000
1263        {
1264            let provider = factory.database_provider_rw().unwrap();
1265            provider
1266                .save_stage_checkpoint(
1267                    StageId::IndexStorageHistory,
1268                    StageCheckpoint::new(CHECKPOINT_BLOCK),
1269                )
1270                .unwrap();
1271            provider.commit().unwrap();
1272        }
1273
1274        // Insert stale StoragesHistory entries for blocks 5001-14999
1275        // These are (address, slot) pairs that changed after the checkpoint
1276        for block_num in (CHECKPOINT_BLOCK + 1)..TOTAL_BLOCKS {
1277            let key =
1278                StorageShardedKey::new(make_address(block_num), make_slot(block_num), block_num);
1279            let block_list = BlockNumberList::new_pre_sorted([block_num]);
1280            rocksdb.put::<tables::StoragesHistory>(key, &block_list).unwrap();
1281        }
1282
1283        // Verify RocksDB has stale entries before healing
1284        let count_before: usize = rocksdb.iter::<tables::StoragesHistory>().unwrap().count();
1285        assert_eq!(
1286            count_before,
1287            (TOTAL_BLOCKS - CHECKPOINT_BLOCK - 1) as usize,
1288            "Should have {} stale entries before healing",
1289            TOTAL_BLOCKS - CHECKPOINT_BLOCK - 1
1290        );
1291
1292        // Run check_consistency - this should heal by pruning stale entries
1293        let provider = factory.database_provider_ro().unwrap();
1294        let result = rocksdb.check_consistency(&provider).unwrap();
1295        assert_eq!(result, None, "Should heal via changesets, no unwind needed");
1296
1297        // Verify all stale entries were pruned
1298        // After healing, entries with highest_block_number > checkpoint should be gone
1299        let mut remaining_stale = 0;
1300        for result in rocksdb.iter::<tables::StoragesHistory>().unwrap() {
1301            let (key, _) = result.unwrap();
1302            if key.sharded_key.highest_block_number > CHECKPOINT_BLOCK {
1303                remaining_stale += 1;
1304            }
1305        }
1306        assert_eq!(
1307            remaining_stale, 0,
1308            "All stale entries (block > {}) should be pruned",
1309            CHECKPOINT_BLOCK
1310        );
1311    }
1312
1313    /// Tests that healing preserves entries at exactly the checkpoint block.
1314    ///
1315    /// This catches off-by-one bugs where checkpoint block data is incorrectly deleted.
1316    #[test]
1317    fn test_check_consistency_storages_history_preserves_checkpoint_block() {
1318        use alloy_primitives::U256;
1319        use reth_db_api::models::StorageBeforeTx;
1320
1321        const CHECKPOINT_BLOCK: u64 = 100;
1322        const SF_TIP: u64 = 200;
1323
1324        let temp_dir = TempDir::new().unwrap();
1325        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1326
1327        let factory = create_test_provider_factory();
1328        factory.set_storage_settings_cache(StorageSettings::v2());
1329
1330        let checkpoint_addr = Address::repeat_byte(0xAA);
1331        let checkpoint_slot = B256::repeat_byte(0xBB);
1332        let stale_addr = Address::repeat_byte(0xCC);
1333        let stale_slot = B256::repeat_byte(0xDD);
1334
1335        // Write storage changesets to static files
1336        {
1337            let sf_provider = factory.static_file_provider();
1338            let mut writer =
1339                sf_provider.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
1340
1341            for block_num in 0..=SF_TIP {
1342                let changeset = if block_num == CHECKPOINT_BLOCK {
1343                    vec![StorageBeforeTx {
1344                        address: checkpoint_addr,
1345                        key: checkpoint_slot,
1346                        value: U256::from(block_num),
1347                    }]
1348                } else if block_num > CHECKPOINT_BLOCK {
1349                    vec![StorageBeforeTx {
1350                        address: stale_addr,
1351                        key: stale_slot,
1352                        value: U256::from(block_num),
1353                    }]
1354                } else {
1355                    vec![StorageBeforeTx {
1356                        address: Address::ZERO,
1357                        key: B256::ZERO,
1358                        value: U256::ZERO,
1359                    }]
1360                };
1361                writer.append_storage_changeset(changeset, block_num).unwrap();
1362            }
1363            writer.commit().unwrap();
1364        }
1365
1366        // Set checkpoint
1367        {
1368            let provider = factory.database_provider_rw().unwrap();
1369            provider
1370                .save_stage_checkpoint(
1371                    StageId::IndexStorageHistory,
1372                    StageCheckpoint::new(CHECKPOINT_BLOCK),
1373                )
1374                .unwrap();
1375            provider.commit().unwrap();
1376        }
1377
1378        // Insert entry AT the checkpoint block (should be preserved)
1379        let checkpoint_key =
1380            StorageShardedKey::new(checkpoint_addr, checkpoint_slot, CHECKPOINT_BLOCK);
1381        let checkpoint_list = BlockNumberList::new_pre_sorted([CHECKPOINT_BLOCK]);
1382        rocksdb.put::<tables::StoragesHistory>(checkpoint_key.clone(), &checkpoint_list).unwrap();
1383
1384        // Insert stale entry AFTER the checkpoint (should be removed)
1385        let stale_key = StorageShardedKey::new(stale_addr, stale_slot, SF_TIP);
1386        let stale_list = BlockNumberList::new_pre_sorted([CHECKPOINT_BLOCK + 1, SF_TIP]);
1387        rocksdb.put::<tables::StoragesHistory>(stale_key.clone(), &stale_list).unwrap();
1388
1389        // Run healing
1390        let provider = factory.database_provider_ro().unwrap();
1391        let result = rocksdb.check_consistency(&provider).unwrap();
1392        assert_eq!(result, None, "Should heal without unwind");
1393
1394        // Verify checkpoint block entry is PRESERVED
1395        let preserved = rocksdb.get::<tables::StoragesHistory>(checkpoint_key).unwrap();
1396        assert!(preserved.is_some(), "Entry at checkpoint block should be preserved, not deleted");
1397
1398        // Verify stale entry is removed or unwound
1399        let stale = rocksdb.get::<tables::StoragesHistory>(stale_key).unwrap();
1400        assert!(stale.is_none(), "Stale entry after checkpoint should be removed");
1401    }
1402
1403    /// Tests `AccountsHistory` changeset-based healing with enough blocks to trigger batching.
1404    ///
1405    /// Scenario:
1406    /// 1. Generate 15,000 blocks worth of account changeset data (to exceed the 10k batch size)
1407    /// 2. Each block has 1 account change (simple - just random addresses)
1408    /// 3. Write account changesets to static files for all 15k blocks
1409    /// 4. Set `IndexAccountHistory` checkpoint to block 5000
1410    /// 5. Insert stale `AccountsHistory` entries in `RocksDB` for addresses that changed in blocks
1411    ///    5001-15000
1412    /// 6. Run `check_consistency`
1413    /// 7. Verify:
1414    ///    - Stale entries for blocks > 5000 are pruned
1415    ///    - The batching worked (no OOM, completed successfully)
1416    #[test]
1417    fn test_check_consistency_accounts_history_heals_via_changesets_large_range() {
1418        use reth_db::models::AccountBeforeTx;
1419        use reth_db_api::models::ShardedKey;
1420        use reth_static_file_types::StaticFileSegment;
1421
1422        let temp_dir = TempDir::new().unwrap();
1423        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1424
1425        // Create test provider factory
1426        let factory = create_test_provider_factory();
1427        factory.set_storage_settings_cache(StorageSettings::v2());
1428
1429        const TOTAL_BLOCKS: u64 = 15_000;
1430        const CHECKPOINT_BLOCK: u64 = 5_000;
1431
1432        // Helper to generate address from block number (avoids pre-allocating 15k addresses)
1433        #[inline]
1434        fn make_address(block_num: u64) -> Address {
1435            let mut addr = Address::ZERO;
1436            addr.0[0..8].copy_from_slice(&block_num.to_le_bytes());
1437            addr
1438        }
1439
1440        // Write account changesets to static files for all 15k blocks
1441        {
1442            let sf_provider = factory.static_file_provider();
1443            let mut writer =
1444                sf_provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
1445
1446            // Reuse changeset vec to avoid repeated allocations
1447            let mut changeset = Vec::with_capacity(1);
1448
1449            for block_num in 0..TOTAL_BLOCKS {
1450                changeset.clear();
1451                changeset.push(AccountBeforeTx { address: make_address(block_num), info: None });
1452                writer.append_account_changeset(changeset.clone(), block_num).unwrap();
1453            }
1454
1455            writer.commit().unwrap();
1456        }
1457
1458        // Insert stale AccountsHistory entries in RocksDB for addresses that changed
1459        // in blocks 5001-15000 (i.e., blocks after the checkpoint)
1460        // These should be pruned by check_consistency
1461        for block_num in (CHECKPOINT_BLOCK + 1)..TOTAL_BLOCKS {
1462            let key = ShardedKey::new(make_address(block_num), block_num);
1463            let block_list = BlockNumberList::new_pre_sorted([block_num]);
1464            rocksdb.put::<tables::AccountsHistory>(key, &block_list).unwrap();
1465        }
1466
1467        // Also insert some valid entries for blocks <= 5000 that should NOT be pruned
1468        for block_num in [100u64, 500, 1000, 2500, 5000] {
1469            let key = ShardedKey::new(make_address(block_num), block_num);
1470            let block_list = BlockNumberList::new_pre_sorted([block_num]);
1471            rocksdb.put::<tables::AccountsHistory>(key, &block_list).unwrap();
1472        }
1473
1474        // Verify we have entries before healing
1475        let entries_before: usize = rocksdb.iter::<tables::AccountsHistory>().unwrap().count();
1476        let stale_count = (TOTAL_BLOCKS - CHECKPOINT_BLOCK - 1) as usize;
1477        let valid_count = 5usize;
1478        assert_eq!(
1479            entries_before,
1480            stale_count + valid_count,
1481            "Should have {} stale + {} valid entries before healing",
1482            stale_count,
1483            valid_count
1484        );
1485
1486        // Set IndexAccountHistory checkpoint to block 5000
1487        {
1488            let provider = factory.database_provider_rw().unwrap();
1489            provider
1490                .save_stage_checkpoint(
1491                    StageId::IndexAccountHistory,
1492                    StageCheckpoint::new(CHECKPOINT_BLOCK),
1493                )
1494                .unwrap();
1495            provider.commit().unwrap();
1496        }
1497
1498        let provider = factory.database_provider_ro().unwrap();
1499
1500        // Verify sf_tip > checkpoint
1501        let sf_tip = provider
1502            .static_file_provider()
1503            .get_highest_static_file_block(StaticFileSegment::AccountChangeSets)
1504            .unwrap();
1505        assert_eq!(sf_tip, TOTAL_BLOCKS - 1, "Static file tip should be 14999");
1506        assert!(sf_tip > CHECKPOINT_BLOCK, "sf_tip should be > checkpoint to trigger healing");
1507
1508        // Run check_consistency - this should trigger batched changeset-based healing
1509        let result = rocksdb.check_consistency(&provider).unwrap();
1510        assert_eq!(result, None, "Healing should succeed without requiring unwind");
1511
1512        // Verify: all stale entries for blocks > 5000 should be pruned
1513        // Count remaining entries with highest_block_number > checkpoint
1514        let mut remaining_stale = 0;
1515        for result in rocksdb.iter::<tables::AccountsHistory>().unwrap() {
1516            let (key, _) = result.unwrap();
1517            if key.highest_block_number > CHECKPOINT_BLOCK && key.highest_block_number != u64::MAX {
1518                remaining_stale += 1;
1519            }
1520        }
1521        assert_eq!(
1522            remaining_stale, 0,
1523            "All stale entries (block > {}) should be pruned",
1524            CHECKPOINT_BLOCK
1525        );
1526    }
1527
1528    /// Tests that accounts history healing preserves entries at exactly the checkpoint block.
1529    #[test]
1530    fn test_check_consistency_accounts_history_preserves_checkpoint_block() {
1531        use reth_db::models::AccountBeforeTx;
1532        use reth_db_api::models::ShardedKey;
1533
1534        const CHECKPOINT_BLOCK: u64 = 100;
1535        const SF_TIP: u64 = 200;
1536
1537        let temp_dir = TempDir::new().unwrap();
1538        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1539
1540        let factory = create_test_provider_factory();
1541        factory.set_storage_settings_cache(StorageSettings::v2());
1542
1543        let checkpoint_addr = Address::repeat_byte(0xAA);
1544        let stale_addr = Address::repeat_byte(0xCC);
1545
1546        // Write account changesets to static files
1547        {
1548            let sf_provider = factory.static_file_provider();
1549            let mut writer =
1550                sf_provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
1551
1552            for block_num in 0..=SF_TIP {
1553                let changeset = if block_num == CHECKPOINT_BLOCK {
1554                    vec![AccountBeforeTx { address: checkpoint_addr, info: None }]
1555                } else if block_num > CHECKPOINT_BLOCK {
1556                    vec![AccountBeforeTx { address: stale_addr, info: None }]
1557                } else {
1558                    vec![AccountBeforeTx { address: Address::ZERO, info: None }]
1559                };
1560                writer.append_account_changeset(changeset, block_num).unwrap();
1561            }
1562            writer.commit().unwrap();
1563        }
1564
1565        // Set checkpoint
1566        {
1567            let provider = factory.database_provider_rw().unwrap();
1568            provider
1569                .save_stage_checkpoint(
1570                    StageId::IndexAccountHistory,
1571                    StageCheckpoint::new(CHECKPOINT_BLOCK),
1572                )
1573                .unwrap();
1574            provider.commit().unwrap();
1575        }
1576
1577        // Insert entry AT the checkpoint block (should be preserved)
1578        let checkpoint_key = ShardedKey::new(checkpoint_addr, CHECKPOINT_BLOCK);
1579        let checkpoint_list = BlockNumberList::new_pre_sorted([CHECKPOINT_BLOCK]);
1580        rocksdb.put::<tables::AccountsHistory>(checkpoint_key.clone(), &checkpoint_list).unwrap();
1581
1582        // Insert stale entry AFTER the checkpoint (should be removed)
1583        let stale_key = ShardedKey::new(stale_addr, SF_TIP);
1584        let stale_list = BlockNumberList::new_pre_sorted([CHECKPOINT_BLOCK + 1, SF_TIP]);
1585        rocksdb.put::<tables::AccountsHistory>(stale_key.clone(), &stale_list).unwrap();
1586
1587        // Run healing
1588        let provider = factory.database_provider_ro().unwrap();
1589        let result = rocksdb.check_consistency(&provider).unwrap();
1590        assert_eq!(result, None, "Should heal without unwind");
1591
1592        // Verify checkpoint block entry is PRESERVED
1593        let preserved = rocksdb.get::<tables::AccountsHistory>(checkpoint_key).unwrap();
1594        assert!(preserved.is_some(), "Entry at checkpoint block should be preserved, not deleted");
1595
1596        // Verify stale entry is removed or unwound
1597        let stale = rocksdb.get::<tables::AccountsHistory>(stale_key).unwrap();
1598        assert!(stale.is_none(), "Stale entry after checkpoint should be removed");
1599    }
1600
1601    #[test]
1602    fn test_check_consistency_storages_history_sf_tip_equals_checkpoint_no_action() {
1603        use alloy_primitives::U256;
1604        use reth_db::models::StorageBeforeTx;
1605        use reth_static_file_types::StaticFileSegment;
1606
1607        let temp_dir = TempDir::new().unwrap();
1608        let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
1609
1610        // Insert StoragesHistory entries into RocksDB
1611        let key1 = StorageShardedKey::new(Address::ZERO, B256::ZERO, 50);
1612        let key2 = StorageShardedKey::new(Address::random(), B256::random(), 80);
1613        let block_list1 = BlockNumberList::new_pre_sorted([10, 20, 30, 50]);
1614        let block_list2 = BlockNumberList::new_pre_sorted([40, 60, 80]);
1615        rocksdb.put::<tables::StoragesHistory>(key1, &block_list1).unwrap();
1616        rocksdb.put::<tables::StoragesHistory>(key2, &block_list2).unwrap();
1617
1618        // Capture entries before consistency check
1619        let entries_before: Vec<_> =
1620            rocksdb.iter::<tables::StoragesHistory>().unwrap().map(|r| r.unwrap()).collect();
1621
1622        // Create a test provider factory
1623        let factory = create_test_provider_factory();
1624        factory.set_storage_settings_cache(StorageSettings::v2());
1625
1626        // Write storage changesets to static files for blocks 0-100
1627        {
1628            let sf_provider = factory.static_file_provider();
1629            let mut writer =
1630                sf_provider.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
1631
1632            for block_num in 0..=100u64 {
1633                let changeset = vec![StorageBeforeTx {
1634                    address: Address::ZERO,
1635                    key: B256::with_last_byte(block_num as u8),
1636                    value: U256::from(block_num),
1637                }];
1638                writer.append_storage_changeset(changeset, block_num).unwrap();
1639            }
1640            writer.commit().unwrap();
1641        }
1642
1643        // Set IndexStorageHistory checkpoint to block 100 (same as sf_tip)
1644        {
1645            let provider = factory.database_provider_rw().unwrap();
1646            provider
1647                .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
1648                .unwrap();
1649            provider.commit().unwrap();
1650        }
1651
1652        let provider = factory.database_provider_ro().unwrap();
1653
1654        // Verify sf_tip equals checkpoint (both at 100)
1655        let sf_tip = provider
1656            .static_file_provider()
1657            .get_highest_static_file_block(StaticFileSegment::StorageChangeSets)
1658            .unwrap();
1659        assert_eq!(sf_tip, 100, "Static file tip should be 100");
1660
1661        // Run check_consistency - should return None (no unwind needed)
1662        let result = rocksdb.check_consistency(&provider).unwrap();
1663        assert_eq!(result, None, "sf_tip == checkpoint should not require unwind");
1664
1665        // Verify NO entries are deleted - RocksDB state unchanged
1666        let entries_after: Vec<_> =
1667            rocksdb.iter::<tables::StoragesHistory>().unwrap().map(|r| r.unwrap()).collect();
1668
1669        assert_eq!(
1670            entries_after.len(),
1671            entries_before.len(),
1672            "RocksDB entry count should be unchanged when sf_tip == checkpoint"
1673        );
1674
1675        // Verify exact entries are preserved
1676        for (before, after) in entries_before.iter().zip(entries_after.iter()) {
1677            assert_eq!(before.0, after.0, "Entry key should be unchanged");
1678            assert_eq!(before.1, after.1, "Entry block list should be unchanged");
1679        }
1680    }
1681}