reth_provider/providers/rocksdb/
invariants.rs

1//! Invariant checking for `RocksDB` tables.
2//!
3//! This module provides consistency checks for tables stored in `RocksDB`, similar to the
4//! consistency checks for static files. The goal is to detect and potentially heal
5//! inconsistencies between `RocksDB` data and MDBX checkpoints.
6
7use super::RocksDBProvider;
8use crate::StaticFileProviderFactory;
9use alloy_eips::eip2718::Encodable2718;
10use alloy_primitives::BlockNumber;
11use rayon::prelude::*;
12use reth_db_api::tables;
13use reth_stages_types::StageId;
14use reth_static_file_types::StaticFileSegment;
15use reth_storage_api::{
16    BlockBodyIndicesProvider, ChangeSetReader, DBProvider, StageCheckpointReader,
17    StorageChangeSetReader, StorageSettingsCache, TransactionsProvider,
18};
19use reth_storage_errors::provider::ProviderResult;
20use std::collections::HashSet;
21
22/// Batch size for changeset iteration during history healing.
23/// Balances memory usage against iteration overhead.
24const HEAL_HISTORY_BATCH_SIZE: u64 = 10_000;
25
26impl RocksDBProvider {
27    /// Checks consistency of `RocksDB` tables against MDBX stage checkpoints.
28    ///
29    /// Returns an unwind target block number if the pipeline needs to unwind to rebuild
30    /// `RocksDB` data. Returns `None` if all invariants pass or if inconsistencies were healed.
31    ///
32    /// # Invariants checked
33    ///
34    /// For `TransactionHashNumbers`:
35    /// - The maximum `TxNumber` value should not exceed what the `TransactionLookup` stage
36    ///   checkpoint indicates has been processed.
37    /// - If `RocksDB` is ahead, excess entries are pruned (healed).
38    /// - If `RocksDB` is behind, an unwind is required.
39    ///
40    /// For `StoragesHistory` and `AccountsHistory`:
41    /// - Uses changesets to heal stale entries when static file tip > checkpoint.
42    ///
43    /// # Requirements
44    ///
45    /// For pruning `TransactionHashNumbers`, the provider must be able to supply transaction
46    /// data (typically from static files) so that transaction hashes can be computed. This
47    /// implies that static files should be ahead of or in sync with `RocksDB`.
48    pub fn check_consistency<Provider>(
49        &self,
50        provider: &Provider,
51    ) -> ProviderResult<Option<BlockNumber>>
52    where
53        Provider: DBProvider
54            + StageCheckpointReader
55            + StorageSettingsCache
56            + StaticFileProviderFactory
57            + BlockBodyIndicesProvider
58            + StorageChangeSetReader
59            + ChangeSetReader
60            + TransactionsProvider<Transaction: Encodable2718>,
61    {
62        let mut unwind_target: Option<BlockNumber> = None;
63
64        // Heal TransactionHashNumbers if stored in RocksDB
65        if provider.cached_storage_settings().transaction_hash_numbers_in_rocksdb &&
66            let Some(target) = self.heal_transaction_hash_numbers(provider)?
67        {
68            unwind_target = Some(unwind_target.map_or(target, |t| t.min(target)));
69        }
70
71        // Heal StoragesHistory if stored in RocksDB
72        if provider.cached_storage_settings().storages_history_in_rocksdb &&
73            let Some(target) = self.heal_storages_history(provider)?
74        {
75            unwind_target = Some(unwind_target.map_or(target, |t| t.min(target)));
76        }
77
78        // Heal AccountsHistory if stored in RocksDB
79        if provider.cached_storage_settings().account_history_in_rocksdb &&
80            let Some(target) = self.heal_accounts_history(provider)?
81        {
82            unwind_target = Some(unwind_target.map_or(target, |t| t.min(target)));
83        }
84
85        Ok(unwind_target)
86    }
87
88    /// Heals the `TransactionHashNumbers` table.
89    ///
90    /// - Fast path: if checkpoint == 0 AND `RocksDB` has data, clear everything
91    /// - If `sf_tip` < checkpoint, return unwind target (static files behind)
92    /// - If `sf_tip` == checkpoint, nothing to do
93    /// - If `sf_tip` > checkpoint, heal via transaction ranges in batches
94    fn heal_transaction_hash_numbers<Provider>(
95        &self,
96        provider: &Provider,
97    ) -> ProviderResult<Option<BlockNumber>>
98    where
99        Provider: DBProvider
100            + StageCheckpointReader
101            + StaticFileProviderFactory
102            + BlockBodyIndicesProvider
103            + TransactionsProvider<Transaction: Encodable2718>,
104    {
105        let checkpoint = provider
106            .get_stage_checkpoint(StageId::TransactionLookup)?
107            .map(|cp| cp.block_number)
108            .unwrap_or(0);
109
110        let sf_tip = provider
111            .static_file_provider()
112            .get_highest_static_file_block(StaticFileSegment::Transactions)
113            .unwrap_or(0);
114
115        // Fast path: if checkpoint is 0 and RocksDB has data, clear everything.
116        if checkpoint == 0 && self.first::<tables::TransactionHashNumbers>()?.is_some() {
117            tracing::info!(
118                target: "reth::providers::rocksdb",
119                "TransactionHashNumbers has data but checkpoint is 0, clearing all"
120            );
121            self.clear::<tables::TransactionHashNumbers>()?;
122            return Ok(None);
123        }
124
125        if sf_tip < checkpoint {
126            // This should never happen in normal operation - static files are always committed
127            // before RocksDB. If we get here, something is seriously wrong. The unwind is a
128            // best-effort attempt but is probably futile.
129            tracing::warn!(
130                target: "reth::providers::rocksdb",
131                sf_tip,
132                checkpoint,
133                "TransactionHashNumbers: static file tip behind checkpoint, unwind needed"
134            );
135            return Ok(Some(sf_tip));
136        }
137
138        // sf_tip == checkpoint - nothing to do
139        if sf_tip == checkpoint {
140            return Ok(None);
141        }
142
143        // Get end tx from static files (authoritative for sf_tip)
144        let sf_tip_end_tx = provider
145            .static_file_provider()
146            .get_highest_static_file_tx(StaticFileSegment::Transactions)
147            .unwrap_or(0);
148
149        // Get the first tx after the checkpoint block from MDBX (authoritative up to checkpoint)
150        let checkpoint_next_tx = provider
151            .block_body_indices(checkpoint)?
152            .map(|indices| indices.next_tx_num())
153            .unwrap_or(0);
154
155        if sf_tip_end_tx < checkpoint_next_tx {
156            // This should never happen in normal operation - static files should have all
157            // transactions up to sf_tip. If we get here, something is seriously wrong.
158            // The unwind is a best-effort attempt but is probably futile.
159            tracing::warn!(
160                target: "reth::providers::rocksdb",
161                sf_tip_end_tx,
162                checkpoint_next_tx,
163                checkpoint,
164                sf_tip,
165                "TransactionHashNumbers: static file tx tip behind checkpoint, unwind needed"
166            );
167            return Ok(Some(sf_tip));
168        }
169
170        tracing::info!(
171            target: "reth::providers::rocksdb",
172            checkpoint,
173            sf_tip,
174            checkpoint_next_tx,
175            sf_tip_end_tx,
176            "TransactionHashNumbers: healing via transaction ranges"
177        );
178
179        const BATCH_SIZE: u64 = 10_000;
180        let mut batch_start = checkpoint_next_tx;
181
182        while batch_start <= sf_tip_end_tx {
183            let batch_end = batch_start.saturating_add(BATCH_SIZE - 1).min(sf_tip_end_tx);
184
185            tracing::debug!(
186                target: "reth::providers::rocksdb",
187                batch_start,
188                batch_end,
189                "Pruning TransactionHashNumbers batch"
190            );
191
192            self.prune_transaction_hash_numbers_in_range(provider, batch_start..=batch_end)?;
193
194            batch_start = batch_end.saturating_add(1);
195        }
196
197        Ok(None)
198    }
199
200    /// Prunes `TransactionHashNumbers` entries for transactions in the given range.
201    ///
202    /// This fetches transactions from the provider, computes their hashes in parallel,
203    /// and deletes the corresponding entries from `RocksDB` by key. This approach is more
204    /// scalable than iterating all rows because it only processes the transactions that
205    /// need to be pruned.
206    ///
207    /// # Requirements
208    ///
209    /// The provider must be able to supply transaction data (typically from static files)
210    /// so that transaction hashes can be computed. This implies that static files should
211    /// be ahead of or in sync with `RocksDB`.
212    fn prune_transaction_hash_numbers_in_range<Provider>(
213        &self,
214        provider: &Provider,
215        tx_range: std::ops::RangeInclusive<u64>,
216    ) -> ProviderResult<()>
217    where
218        Provider: TransactionsProvider<Transaction: Encodable2718>,
219    {
220        if tx_range.is_empty() {
221            return Ok(());
222        }
223
224        // Fetch transactions in the range and compute their hashes in parallel
225        let hashes: Vec<_> = provider
226            .transactions_by_tx_range(tx_range.clone())?
227            .into_par_iter()
228            .map(|tx| tx.trie_hash())
229            .collect();
230
231        if !hashes.is_empty() {
232            tracing::info!(
233                target: "reth::providers::rocksdb",
234                deleted_count = hashes.len(),
235                tx_range_start = *tx_range.start(),
236                tx_range_end = *tx_range.end(),
237                "Pruning TransactionHashNumbers entries by tx range"
238            );
239
240            let mut batch = self.batch();
241            for hash in hashes {
242                batch.delete::<tables::TransactionHashNumbers>(hash)?;
243            }
244            batch.commit()?;
245        }
246
247        Ok(())
248    }
249
250    /// Heals the `StoragesHistory` table by removing stale entries.
251    ///
252    /// Returns an unwind target if static file tip is behind checkpoint (cannot heal).
253    /// Otherwise iterates changesets in batches to identify and unwind affected keys.
254    fn heal_storages_history<Provider>(
255        &self,
256        provider: &Provider,
257    ) -> ProviderResult<Option<BlockNumber>>
258    where
259        Provider:
260            DBProvider + StageCheckpointReader + StaticFileProviderFactory + StorageChangeSetReader,
261    {
262        let checkpoint = provider
263            .get_stage_checkpoint(StageId::IndexStorageHistory)?
264            .map(|cp| cp.block_number)
265            .unwrap_or(0);
266
267        // Fast path: if checkpoint is 0 and RocksDB has data, clear everything.
268        if checkpoint == 0 && self.first::<tables::StoragesHistory>()?.is_some() {
269            tracing::info!(
270                target: "reth::providers::rocksdb",
271                "StoragesHistory has data but checkpoint is 0, clearing all"
272            );
273            self.clear::<tables::StoragesHistory>()?;
274            return Ok(None);
275        }
276
277        let sf_tip = provider
278            .static_file_provider()
279            .get_highest_static_file_block(StaticFileSegment::StorageChangeSets)
280            .unwrap_or(0);
281
282        if sf_tip < checkpoint {
283            // This should never happen in normal operation - static files are always
284            // committed before RocksDB. If we get here, something is seriously wrong.
285            // The unwind is a best-effort attempt but is probably futile.
286            tracing::warn!(
287                target: "reth::providers::rocksdb",
288                sf_tip,
289                checkpoint,
290                "StoragesHistory: static file tip behind checkpoint, unwind needed"
291            );
292            return Ok(Some(sf_tip));
293        }
294
295        if sf_tip == checkpoint {
296            return Ok(None);
297        }
298
299        let total_blocks = sf_tip - checkpoint;
300        tracing::info!(
301            target: "reth::providers::rocksdb",
302            checkpoint,
303            sf_tip,
304            total_blocks,
305            "StoragesHistory: healing via changesets"
306        );
307
308        let mut batch_start = checkpoint + 1;
309        let mut batch_num = 0u64;
310        let total_batches = total_blocks.div_ceil(HEAL_HISTORY_BATCH_SIZE);
311
312        while batch_start <= sf_tip {
313            let batch_end = (batch_start + HEAL_HISTORY_BATCH_SIZE - 1).min(sf_tip);
314            batch_num += 1;
315
316            let changesets = provider.storage_changesets_range(batch_start..=batch_end)?;
317
318            let unique_keys: HashSet<_> = changesets
319                .into_iter()
320                .map(|(block_addr, entry)| (block_addr.address(), entry.key, checkpoint + 1))
321                .collect();
322            let indices: Vec<_> = unique_keys.into_iter().collect();
323
324            if !indices.is_empty() {
325                tracing::info!(
326                    target: "reth::providers::rocksdb",
327                    batch_num,
328                    total_batches,
329                    batch_start,
330                    batch_end,
331                    indices_count = indices.len(),
332                    "StoragesHistory: unwinding batch"
333                );
334
335                let batch = self.unwind_storage_history_indices(&indices)?;
336                self.commit_batch(batch)?;
337            }
338
339            batch_start = batch_end + 1;
340        }
341
342        Ok(None)
343    }
344
345    /// Heals the `AccountsHistory` table by removing stale entries.
346    ///
347    /// Returns an unwind target if static file tip is behind checkpoint (cannot heal).
348    /// Otherwise iterates changesets in batches to identify and unwind affected keys.
349    fn heal_accounts_history<Provider>(
350        &self,
351        provider: &Provider,
352    ) -> ProviderResult<Option<BlockNumber>>
353    where
354        Provider: DBProvider + StageCheckpointReader + StaticFileProviderFactory + ChangeSetReader,
355    {
356        let checkpoint = provider
357            .get_stage_checkpoint(StageId::IndexAccountHistory)?
358            .map(|cp| cp.block_number)
359            .unwrap_or(0);
360
361        // Fast path: if checkpoint is 0 and RocksDB has data, clear everything.
362        if checkpoint == 0 && self.first::<tables::AccountsHistory>()?.is_some() {
363            tracing::info!(
364                target: "reth::providers::rocksdb",
365                "AccountsHistory has data but checkpoint is 0, clearing all"
366            );
367            self.clear::<tables::AccountsHistory>()?;
368            return Ok(None);
369        }
370
371        let sf_tip = provider
372            .static_file_provider()
373            .get_highest_static_file_block(StaticFileSegment::AccountChangeSets)
374            .unwrap_or(0);
375
376        if sf_tip < checkpoint {
377            // This should never happen in normal operation - static files are always
378            // committed before RocksDB. If we get here, something is seriously wrong.
379            // The unwind is a best-effort attempt but is probably futile.
380            tracing::warn!(
381                target: "reth::providers::rocksdb",
382                sf_tip,
383                checkpoint,
384                "AccountsHistory: static file tip behind checkpoint, unwind needed"
385            );
386            return Ok(Some(sf_tip));
387        }
388
389        if sf_tip == checkpoint {
390            return Ok(None);
391        }
392
393        let total_blocks = sf_tip - checkpoint;
394        tracing::info!(
395            target: "reth::providers::rocksdb",
396            checkpoint,
397            sf_tip,
398            total_blocks,
399            "AccountsHistory: healing via changesets"
400        );
401
402        let mut batch_start = checkpoint + 1;
403        let mut batch_num = 0u64;
404        let total_batches = total_blocks.div_ceil(HEAL_HISTORY_BATCH_SIZE);
405
406        while batch_start <= sf_tip {
407            let batch_end = (batch_start + HEAL_HISTORY_BATCH_SIZE - 1).min(sf_tip);
408            batch_num += 1;
409
410            let changesets = provider.account_changesets_range(batch_start..=batch_end)?;
411
412            let mut addresses = HashSet::with_capacity(changesets.len());
413            addresses.extend(changesets.iter().map(|(_, cs)| cs.address));
414            let unwind_from = checkpoint + 1;
415            let indices: Vec<_> = addresses.into_iter().map(|addr| (addr, unwind_from)).collect();
416
417            if !indices.is_empty() {
418                tracing::info!(
419                    target: "reth::providers::rocksdb",
420                    batch_num,
421                    total_batches,
422                    batch_start,
423                    batch_end,
424                    indices_count = indices.len(),
425                    "AccountsHistory: unwinding batch"
426                );
427
428                let batch = self.unwind_account_history_indices(&indices)?;
429                self.commit_batch(batch)?;
430            }
431
432            batch_start = batch_end + 1;
433        }
434
435        Ok(None)
436    }
437}
438
439#[cfg(test)]
440mod tests {
441    use super::*;
442    use crate::{
443        providers::{rocksdb::RocksDBBuilder, static_file::StaticFileWriter},
444        test_utils::create_test_provider_factory,
445        BlockWriter, DatabaseProviderFactory, StageCheckpointWriter, TransactionsProvider,
446    };
447    use alloy_primitives::{Address, B256};
448    use reth_db::cursor::{DbCursorRO, DbCursorRW};
449    use reth_db_api::{
450        models::{storage_sharded_key::StorageShardedKey, StorageSettings},
451        tables::{self, BlockNumberList},
452        transaction::DbTxMut,
453    };
454    use reth_stages_types::StageCheckpoint;
455    use reth_testing_utils::generators::{self, BlockRangeParams};
456    use tempfile::TempDir;
457
458    #[test]
459    fn test_first_last_empty_rocksdb() {
460        let temp_dir = TempDir::new().unwrap();
461        let provider = RocksDBBuilder::new(temp_dir.path())
462            .with_table::<tables::TransactionHashNumbers>()
463            .with_table::<tables::StoragesHistory>()
464            .build()
465            .unwrap();
466
467        // Empty RocksDB, no checkpoints - should be consistent
468        let first = provider.first::<tables::TransactionHashNumbers>().unwrap();
469        let last = provider.last::<tables::TransactionHashNumbers>().unwrap();
470
471        assert!(first.is_none());
472        assert!(last.is_none());
473    }
474
475    #[test]
476    fn test_first_last_with_data() {
477        let temp_dir = TempDir::new().unwrap();
478        let provider = RocksDBBuilder::new(temp_dir.path())
479            .with_table::<tables::TransactionHashNumbers>()
480            .build()
481            .unwrap();
482
483        // Insert some data
484        let tx_hash = B256::from([1u8; 32]);
485        provider.put::<tables::TransactionHashNumbers>(tx_hash, &100).unwrap();
486
487        // RocksDB has data
488        let last = provider.last::<tables::TransactionHashNumbers>().unwrap();
489        assert!(last.is_some());
490        assert_eq!(last.unwrap().1, 100);
491    }
492
493    #[test]
494    fn test_check_consistency_empty_rocksdb_no_checkpoint_is_ok() {
495        let temp_dir = TempDir::new().unwrap();
496        let rocksdb = RocksDBBuilder::new(temp_dir.path())
497            .with_table::<tables::TransactionHashNumbers>()
498            .with_table::<tables::StoragesHistory>()
499            .build()
500            .unwrap();
501
502        // Create a test provider factory for MDBX
503        let factory = create_test_provider_factory();
504        factory.set_storage_settings_cache(
505            StorageSettings::legacy()
506                .with_transaction_hash_numbers_in_rocksdb(true)
507                .with_storages_history_in_rocksdb(true),
508        );
509
510        let provider = factory.database_provider_ro().unwrap();
511
512        // Empty RocksDB and no checkpoints - should be consistent (None = no unwind needed)
513        let result = rocksdb.check_consistency(&provider).unwrap();
514        assert_eq!(result, None);
515    }
516
517    #[test]
518    fn test_check_consistency_empty_rocksdb_with_checkpoint_is_first_run() {
519        let temp_dir = TempDir::new().unwrap();
520        let rocksdb = RocksDBBuilder::new(temp_dir.path())
521            .with_table::<tables::TransactionHashNumbers>()
522            .build()
523            .unwrap();
524
525        // Create a test provider factory for MDBX
526        let factory = create_test_provider_factory();
527        factory.set_storage_settings_cache(
528            StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
529        );
530
531        // Set a checkpoint indicating we should have processed up to block 100
532        {
533            let provider = factory.database_provider_rw().unwrap();
534            provider
535                .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(100))
536                .unwrap();
537            provider.commit().unwrap();
538        }
539
540        let provider = factory.database_provider_ro().unwrap();
541
542        // RocksDB is empty but checkpoint says block 100 was processed.
543        // Since static file tip defaults to 0 when None, and 0 < 100, an unwind is triggered.
544        let result = rocksdb.check_consistency(&provider).unwrap();
545        assert_eq!(result, Some(0), "Static file tip (0) behind checkpoint (100) triggers unwind");
546    }
547
548    /// Tests that when checkpoint=0 and `RocksDB` has data, all entries are pruned.
549    /// This simulates a crash recovery scenario where the checkpoint was lost.
550    #[test]
551    fn test_check_consistency_checkpoint_zero_with_rocksdb_data_prunes_all() {
552        let temp_dir = TempDir::new().unwrap();
553        let rocksdb = RocksDBBuilder::new(temp_dir.path())
554            .with_table::<tables::TransactionHashNumbers>()
555            .build()
556            .unwrap();
557
558        let factory = create_test_provider_factory();
559        factory.set_storage_settings_cache(
560            StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
561        );
562
563        // Generate blocks with real transactions and insert them
564        let mut rng = generators::rng();
565        let blocks = generators::random_block_range(
566            &mut rng,
567            0..=2,
568            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
569        );
570
571        let mut tx_hashes = Vec::new();
572        {
573            let provider = factory.database_provider_rw().unwrap();
574            let mut tx_count = 0u64;
575            for block in &blocks {
576                provider
577                    .insert_block(&block.clone().try_recover().expect("recover block"))
578                    .unwrap();
579                for tx in &block.body().transactions {
580                    let hash = tx.trie_hash();
581                    tx_hashes.push(hash);
582                    rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
583                    tx_count += 1;
584                }
585            }
586            provider.commit().unwrap();
587        }
588
589        // Explicitly clear the TransactionLookup checkpoint to simulate crash recovery
590        {
591            let provider = factory.database_provider_rw().unwrap();
592            provider
593                .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(0))
594                .unwrap();
595            provider.commit().unwrap();
596        }
597
598        // Verify RocksDB data exists
599        assert!(rocksdb.last::<tables::TransactionHashNumbers>().unwrap().is_some());
600
601        let provider = factory.database_provider_ro().unwrap();
602
603        // checkpoint = 0 but RocksDB has data.
604        // This means RocksDB has stale data that should be cleared.
605        let result = rocksdb.check_consistency(&provider).unwrap();
606        assert_eq!(result, None, "Should heal by clearing, no unwind needed");
607
608        // Verify data was cleared
609        for hash in &tx_hashes {
610            assert!(
611                rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap().is_none(),
612                "RocksDB should be empty after pruning"
613            );
614        }
615    }
616
617    #[test]
618    fn test_check_consistency_storages_history_empty_with_checkpoint_is_first_run() {
619        let temp_dir = TempDir::new().unwrap();
620        let rocksdb = RocksDBBuilder::new(temp_dir.path())
621            .with_table::<tables::StoragesHistory>()
622            .build()
623            .unwrap();
624
625        // Create a test provider factory for MDBX
626        let factory = create_test_provider_factory();
627        factory.set_storage_settings_cache(
628            StorageSettings::legacy().with_storages_history_in_rocksdb(true),
629        );
630
631        // Set a checkpoint indicating we should have processed up to block 100
632        {
633            let provider = factory.database_provider_rw().unwrap();
634            provider
635                .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
636                .unwrap();
637            provider.commit().unwrap();
638        }
639
640        let provider = factory.database_provider_ro().unwrap();
641
642        // RocksDB is empty but checkpoint says block 100 was processed.
643        // Since sf_tip=0 < checkpoint=100, we return unwind target of 0.
644        // This should never happen in normal operation.
645        let result = rocksdb.check_consistency(&provider).unwrap();
646        assert_eq!(result, Some(0), "sf_tip=0 < checkpoint=100 returns unwind target");
647    }
648
649    #[test]
650    fn test_check_consistency_storages_history_has_data_no_checkpoint_prunes_data() {
651        let temp_dir = TempDir::new().unwrap();
652        let rocksdb = RocksDBBuilder::new(temp_dir.path())
653            .with_table::<tables::StoragesHistory>()
654            .build()
655            .unwrap();
656
657        // Insert data into RocksDB
658        let key = StorageShardedKey::new(Address::ZERO, B256::ZERO, 50);
659        let block_list = BlockNumberList::new_pre_sorted([10, 20, 30, 50]);
660        rocksdb.put::<tables::StoragesHistory>(key, &block_list).unwrap();
661
662        // Verify data exists
663        assert!(rocksdb.last::<tables::StoragesHistory>().unwrap().is_some());
664
665        // Create a test provider factory for MDBX with NO checkpoint
666        let factory = create_test_provider_factory();
667        factory.set_storage_settings_cache(
668            StorageSettings::legacy().with_storages_history_in_rocksdb(true),
669        );
670
671        let provider = factory.database_provider_ro().unwrap();
672
673        // RocksDB has data but checkpoint is 0
674        // This means RocksDB has stale data that should be pruned (healed)
675        let result = rocksdb.check_consistency(&provider).unwrap();
676        assert_eq!(result, None, "Should heal by pruning, no unwind needed");
677
678        // Verify data was pruned
679        assert!(
680            rocksdb.last::<tables::StoragesHistory>().unwrap().is_none(),
681            "RocksDB should be empty after pruning"
682        );
683    }
684    #[test]
685    fn test_check_consistency_mdbx_behind_checkpoint_needs_unwind() {
686        let temp_dir = TempDir::new().unwrap();
687        let rocksdb = RocksDBBuilder::new(temp_dir.path())
688            .with_table::<tables::TransactionHashNumbers>()
689            .build()
690            .unwrap();
691
692        let factory = create_test_provider_factory();
693        factory.set_storage_settings_cache(
694            StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
695        );
696
697        // Generate blocks with real transactions (blocks 0-2, 6 transactions total)
698        let mut rng = generators::rng();
699        let blocks = generators::random_block_range(
700            &mut rng,
701            0..=2,
702            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
703        );
704
705        {
706            let provider = factory.database_provider_rw().unwrap();
707            let mut tx_count = 0u64;
708            for block in &blocks {
709                provider
710                    .insert_block(&block.clone().try_recover().expect("recover block"))
711                    .unwrap();
712                for tx in &block.body().transactions {
713                    let hash = tx.trie_hash();
714                    rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
715                    tx_count += 1;
716                }
717            }
718            provider.commit().unwrap();
719        }
720
721        // Set checkpoint to block 10 (beyond our actual data at block 2)
722        // sf_tip is at block 2, checkpoint is at block 10
723        // Since sf_tip < checkpoint, we need to unwind to sf_tip
724        {
725            let provider = factory.database_provider_rw().unwrap();
726            provider
727                .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(10))
728                .unwrap();
729            provider.commit().unwrap();
730        }
731
732        let provider = factory.database_provider_ro().unwrap();
733
734        // sf_tip (2) < checkpoint (10), so unwind to sf_tip is needed
735        let result = rocksdb.check_consistency(&provider).unwrap();
736        assert_eq!(result, Some(2), "sf_tip < checkpoint requires unwind to sf_tip");
737    }
738
739    #[test]
740    fn test_check_consistency_rocksdb_ahead_of_checkpoint_prunes_excess() {
741        let temp_dir = TempDir::new().unwrap();
742        let rocksdb = RocksDBBuilder::new(temp_dir.path())
743            .with_table::<tables::TransactionHashNumbers>()
744            .build()
745            .unwrap();
746
747        // Create a test provider factory for MDBX
748        let factory = create_test_provider_factory();
749        factory.set_storage_settings_cache(
750            StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
751        );
752
753        // Generate blocks with real transactions:
754        // Blocks 0-5, each with 2 transactions = 12 total transactions (0-11)
755        let mut rng = generators::rng();
756        let blocks = generators::random_block_range(
757            &mut rng,
758            0..=5,
759            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
760        );
761
762        // Track which hashes belong to which blocks
763        let mut tx_hashes = Vec::new();
764        let mut tx_count = 0u64;
765        {
766            let provider = factory.database_provider_rw().unwrap();
767            // Insert ALL blocks (0-5) to write transactions to static files
768            for block in &blocks {
769                provider
770                    .insert_block(&block.clone().try_recover().expect("recover block"))
771                    .unwrap();
772                for tx in &block.body().transactions {
773                    let hash = tx.trie_hash();
774                    tx_hashes.push(hash);
775                    rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
776                    tx_count += 1;
777                }
778            }
779            provider.commit().unwrap();
780        }
781
782        // Simulate crash recovery scenario:
783        // MDBX was unwound to block 2, but RocksDB and static files still have more data.
784        // Remove TransactionBlocks entries for blocks 3-5 to simulate MDBX unwind.
785        {
786            let provider = factory.database_provider_rw().unwrap();
787            // Delete TransactionBlocks entries for tx > 5 (i.e., for blocks 3-5)
788            // TransactionBlocks maps last_tx_in_block -> block_number
789            // After unwind, only entries for blocks 0-2 should remain (tx 5 -> block 2)
790            let mut cursor = provider.tx_ref().cursor_write::<tables::TransactionBlocks>().unwrap();
791            // Walk and delete entries where block > 2
792            let mut to_delete = Vec::new();
793            let mut walker = cursor.walk(Some(0)).unwrap();
794            while let Some((tx_num, block_num)) = walker.next().transpose().unwrap() {
795                if block_num > 2 {
796                    to_delete.push(tx_num);
797                }
798            }
799            drop(walker);
800            for tx_num in to_delete {
801                cursor.seek_exact(tx_num).unwrap();
802                cursor.delete_current().unwrap();
803            }
804
805            // Set checkpoint to block 2
806            provider
807                .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(2))
808                .unwrap();
809            provider.commit().unwrap();
810        }
811
812        let provider = factory.database_provider_ro().unwrap();
813
814        // RocksDB has tx hashes for all blocks (0-5)
815        // MDBX TransactionBlocks only goes up to tx 5 (block 2)
816        // Static files have data for all txs (0-11)
817        // This means RocksDB is ahead and should prune entries for tx 6-11
818        let result = rocksdb.check_consistency(&provider).unwrap();
819        assert_eq!(result, None, "Should heal by pruning, no unwind needed");
820
821        // Verify: hashes for blocks 0-2 (tx 0-5) should remain, blocks 3-5 (tx 6-11) should be
822        // pruned First 6 hashes should remain
823        for (i, hash) in tx_hashes.iter().take(6).enumerate() {
824            assert!(
825                rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap().is_some(),
826                "tx {} should remain",
827                i
828            );
829        }
830        // Last 6 hashes should be pruned
831        for (i, hash) in tx_hashes.iter().skip(6).enumerate() {
832            assert!(
833                rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap().is_none(),
834                "tx {} should be pruned",
835                i + 6
836            );
837        }
838    }
839
840    #[test]
841    fn test_check_consistency_storages_history_sentinel_only_with_checkpoint_is_first_run() {
842        let temp_dir = TempDir::new().unwrap();
843        let rocksdb = RocksDBBuilder::new(temp_dir.path())
844            .with_table::<tables::StoragesHistory>()
845            .build()
846            .unwrap();
847
848        // Insert ONLY sentinel entries (highest_block_number = u64::MAX)
849        // This simulates a scenario where history tracking started but no shards were completed
850        let key_sentinel_1 = StorageShardedKey::new(Address::ZERO, B256::ZERO, u64::MAX);
851        let key_sentinel_2 = StorageShardedKey::new(Address::random(), B256::random(), u64::MAX);
852        let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
853        rocksdb.put::<tables::StoragesHistory>(key_sentinel_1, &block_list).unwrap();
854        rocksdb.put::<tables::StoragesHistory>(key_sentinel_2, &block_list).unwrap();
855
856        // Verify entries exist (not empty table)
857        assert!(rocksdb.first::<tables::StoragesHistory>().unwrap().is_some());
858
859        // Create a test provider factory for MDBX
860        let factory = create_test_provider_factory();
861        factory.set_storage_settings_cache(
862            StorageSettings::legacy().with_storages_history_in_rocksdb(true),
863        );
864
865        // Set a checkpoint indicating we should have processed up to block 100
866        {
867            let provider = factory.database_provider_rw().unwrap();
868            provider
869                .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
870                .unwrap();
871            provider.commit().unwrap();
872        }
873
874        let provider = factory.database_provider_ro().unwrap();
875
876        // RocksDB has only sentinel entries but checkpoint is set.
877        // Since sf_tip=0 < checkpoint=100, we return unwind target of 0.
878        // This should never happen in normal operation.
879        let result = rocksdb.check_consistency(&provider).unwrap();
880        assert_eq!(result, Some(0), "sf_tip=0 < checkpoint=100 returns unwind target");
881    }
882
883    #[test]
884    fn test_check_consistency_accounts_history_sentinel_only_with_checkpoint_is_first_run() {
885        use reth_db_api::models::ShardedKey;
886
887        let temp_dir = TempDir::new().unwrap();
888        let rocksdb = RocksDBBuilder::new(temp_dir.path())
889            .with_table::<tables::AccountsHistory>()
890            .build()
891            .unwrap();
892
893        // Insert ONLY sentinel entries (highest_block_number = u64::MAX)
894        let key_sentinel_1 = ShardedKey::new(Address::ZERO, u64::MAX);
895        let key_sentinel_2 = ShardedKey::new(Address::random(), u64::MAX);
896        let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
897        rocksdb.put::<tables::AccountsHistory>(key_sentinel_1, &block_list).unwrap();
898        rocksdb.put::<tables::AccountsHistory>(key_sentinel_2, &block_list).unwrap();
899
900        // Verify entries exist (not empty table)
901        assert!(rocksdb.first::<tables::AccountsHistory>().unwrap().is_some());
902
903        // Create a test provider factory for MDBX
904        let factory = create_test_provider_factory();
905        factory.set_storage_settings_cache(
906            StorageSettings::legacy().with_account_history_in_rocksdb(true),
907        );
908
909        // Set a checkpoint indicating we should have processed up to block 100
910        {
911            let provider = factory.database_provider_rw().unwrap();
912            provider
913                .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
914                .unwrap();
915            provider.commit().unwrap();
916        }
917
918        let provider = factory.database_provider_ro().unwrap();
919
920        // RocksDB has only sentinel entries but checkpoint is set.
921        // Since sf_tip=0 < checkpoint=100, we return unwind target of 0.
922        // This should never happen in normal operation.
923        let result = rocksdb.check_consistency(&provider).unwrap();
924        assert_eq!(result, Some(0), "sf_tip=0 < checkpoint=100 returns unwind target");
925    }
926
927    /// Test that pruning works by fetching transactions and computing their hashes,
928    /// rather than iterating all rows. This test uses random blocks with unique
929    /// transactions so we can verify the correct entries are pruned.
930    #[test]
931    fn test_prune_transaction_hash_numbers_by_range() {
932        let temp_dir = TempDir::new().unwrap();
933        let rocksdb = RocksDBBuilder::new(temp_dir.path())
934            .with_table::<tables::TransactionHashNumbers>()
935            .build()
936            .unwrap();
937
938        // Create a test provider factory for MDBX
939        let factory = create_test_provider_factory();
940        factory.set_storage_settings_cache(
941            StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
942        );
943
944        // Generate random blocks with unique transactions
945        // Block 0 (genesis) has no transactions
946        // Blocks 1-5 each have 2 transactions = 10 transactions total
947        let mut rng = generators::rng();
948        let blocks = generators::random_block_range(
949            &mut rng,
950            0..=5,
951            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
952        );
953
954        // Insert blocks into the database
955        let mut tx_count = 0u64;
956        let mut tx_hashes = Vec::new();
957        {
958            let provider = factory.database_provider_rw().unwrap();
959
960            for block in &blocks {
961                provider
962                    .insert_block(&block.clone().try_recover().expect("recover block"))
963                    .unwrap();
964
965                // Store transaction hash -> tx_number mappings in RocksDB
966                for tx in &block.body().transactions {
967                    let hash = tx.trie_hash();
968                    tx_hashes.push(hash);
969                    rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
970                    tx_count += 1;
971                }
972            }
973
974            // Set checkpoint to block 2 (meaning we should only have tx hashes for blocks 0-2)
975            // Blocks 0, 1, 2 have 6 transactions (2 each), so tx 0-5 should remain
976            provider
977                .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(2))
978                .unwrap();
979            provider.commit().unwrap();
980        }
981
982        // At this point:
983        // - RocksDB has tx hashes for blocks 0-5 (10 total: 2 per block)
984        // - Checkpoint says we only processed up to block 2
985        // - We need to prune tx hashes for blocks 3, 4, 5 (tx 6-9)
986
987        // Verify RocksDB has the expected number of entries before pruning
988        let rocksdb_count_before: usize =
989            rocksdb.iter::<tables::TransactionHashNumbers>().unwrap().count();
990        assert_eq!(
991            rocksdb_count_before, tx_count as usize,
992            "RocksDB should have all {} transaction hashes before pruning",
993            tx_count
994        );
995
996        let provider = factory.database_provider_ro().unwrap();
997
998        // Verify we can fetch transactions by tx range
999        let all_txs = provider.transactions_by_tx_range(0..tx_count).unwrap();
1000        assert_eq!(all_txs.len(), tx_count as usize, "Should be able to fetch all transactions");
1001
1002        // Verify the hashes match between what we stored and what we compute from fetched txs
1003        for (i, tx) in all_txs.iter().enumerate() {
1004            let computed_hash = tx.trie_hash();
1005            assert_eq!(
1006                computed_hash, tx_hashes[i],
1007                "Hash mismatch for tx {}: stored {:?} vs computed {:?}",
1008                i, tx_hashes[i], computed_hash
1009            );
1010        }
1011
1012        // Blocks 0, 1, 2 have 2 tx each = 6 tx total (indices 0-5)
1013        // We want to keep tx 0-5, prune tx 6-9
1014        let max_tx_to_keep = 5u64;
1015        let tx_to_prune_start = max_tx_to_keep + 1;
1016
1017        // Prune transactions 6-9 (blocks 3-5)
1018        rocksdb
1019            .prune_transaction_hash_numbers_in_range(&provider, tx_to_prune_start..=(tx_count - 1))
1020            .expect("prune should succeed");
1021
1022        // Verify: transactions 0-5 should remain, 6-9 should be pruned
1023        let mut remaining_count = 0;
1024        for result in rocksdb.iter::<tables::TransactionHashNumbers>().unwrap() {
1025            let (_hash, tx_num) = result.unwrap();
1026            assert!(
1027                tx_num <= max_tx_to_keep,
1028                "Transaction {} should have been pruned (> {})",
1029                tx_num,
1030                max_tx_to_keep
1031            );
1032            remaining_count += 1;
1033        }
1034        assert_eq!(
1035            remaining_count,
1036            (max_tx_to_keep + 1) as usize,
1037            "Should have {} transactions (0-{})",
1038            max_tx_to_keep + 1,
1039            max_tx_to_keep
1040        );
1041    }
1042
1043    #[test]
1044    fn test_check_consistency_accounts_history_empty_with_checkpoint_is_first_run() {
1045        let temp_dir = TempDir::new().unwrap();
1046        let rocksdb = RocksDBBuilder::new(temp_dir.path())
1047            .with_table::<tables::AccountsHistory>()
1048            .build()
1049            .unwrap();
1050
1051        // Create a test provider factory for MDBX
1052        let factory = create_test_provider_factory();
1053        factory.set_storage_settings_cache(
1054            StorageSettings::legacy().with_account_history_in_rocksdb(true),
1055        );
1056
1057        // Set a checkpoint indicating we should have processed up to block 100
1058        {
1059            let provider = factory.database_provider_rw().unwrap();
1060            provider
1061                .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
1062                .unwrap();
1063            provider.commit().unwrap();
1064        }
1065
1066        let provider = factory.database_provider_ro().unwrap();
1067
1068        // RocksDB is empty but checkpoint says block 100 was processed.
1069        // Since sf_tip=0 < checkpoint=100, we return unwind target of 0.
1070        // This should never happen in normal operation.
1071        let result = rocksdb.check_consistency(&provider).unwrap();
1072        assert_eq!(result, Some(0), "sf_tip=0 < checkpoint=100 returns unwind target");
1073    }
1074
1075    #[test]
1076    fn test_check_consistency_accounts_history_has_data_no_checkpoint_prunes_data() {
1077        use reth_db_api::models::ShardedKey;
1078
1079        let temp_dir = TempDir::new().unwrap();
1080        let rocksdb = RocksDBBuilder::new(temp_dir.path())
1081            .with_table::<tables::AccountsHistory>()
1082            .build()
1083            .unwrap();
1084
1085        // Insert data into RocksDB
1086        let key = ShardedKey::new(Address::ZERO, 50);
1087        let block_list = BlockNumberList::new_pre_sorted([10, 20, 30, 50]);
1088        rocksdb.put::<tables::AccountsHistory>(key, &block_list).unwrap();
1089
1090        // Verify data exists
1091        assert!(rocksdb.last::<tables::AccountsHistory>().unwrap().is_some());
1092
1093        // Create a test provider factory for MDBX with NO checkpoint
1094        let factory = create_test_provider_factory();
1095        factory.set_storage_settings_cache(
1096            StorageSettings::legacy().with_account_history_in_rocksdb(true),
1097        );
1098
1099        let provider = factory.database_provider_ro().unwrap();
1100
1101        // RocksDB has data but checkpoint is 0
1102        // This means RocksDB has stale data that should be pruned (healed)
1103        let result = rocksdb.check_consistency(&provider).unwrap();
1104        assert_eq!(result, None, "Should heal by pruning, no unwind needed");
1105
1106        // Verify data was pruned
1107        assert!(
1108            rocksdb.last::<tables::AccountsHistory>().unwrap().is_none(),
1109            "RocksDB should be empty after pruning"
1110        );
1111    }
1112
1113    #[test]
1114    fn test_check_consistency_accounts_history_sf_tip_equals_checkpoint_no_action() {
1115        use reth_db::models::AccountBeforeTx;
1116        use reth_db_api::models::ShardedKey;
1117        use reth_static_file_types::StaticFileSegment;
1118
1119        let temp_dir = TempDir::new().unwrap();
1120        let rocksdb = RocksDBBuilder::new(temp_dir.path())
1121            .with_table::<tables::AccountsHistory>()
1122            .build()
1123            .unwrap();
1124
1125        // Insert some AccountsHistory entries with various highest_block_numbers
1126        let key1 = ShardedKey::new(Address::ZERO, 50);
1127        let key2 = ShardedKey::new(Address::random(), 75);
1128        let key3 = ShardedKey::new(Address::random(), u64::MAX); // sentinel
1129        let block_list1 = BlockNumberList::new_pre_sorted([10, 20, 30, 50]);
1130        let block_list2 = BlockNumberList::new_pre_sorted([40, 60, 75]);
1131        let block_list3 = BlockNumberList::new_pre_sorted([80, 90, 100]);
1132        rocksdb.put::<tables::AccountsHistory>(key1, &block_list1).unwrap();
1133        rocksdb.put::<tables::AccountsHistory>(key2, &block_list2).unwrap();
1134        rocksdb.put::<tables::AccountsHistory>(key3, &block_list3).unwrap();
1135
1136        // Capture RocksDB state before consistency check
1137        let entries_before: Vec<_> =
1138            rocksdb.iter::<tables::AccountsHistory>().unwrap().map(|r| r.unwrap()).collect();
1139        assert_eq!(entries_before.len(), 3, "Should have 3 entries before check");
1140
1141        // Create a test provider factory for MDBX
1142        let factory = create_test_provider_factory();
1143        factory.set_storage_settings_cache(
1144            StorageSettings::legacy().with_account_history_in_rocksdb(true),
1145        );
1146
1147        // Write account changesets to static files for blocks 0-100
1148        {
1149            let sf_provider = factory.static_file_provider();
1150            let mut writer =
1151                sf_provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
1152
1153            for block_num in 0..=100 {
1154                let changeset = vec![AccountBeforeTx { address: Address::random(), info: None }];
1155                writer.append_account_changeset(changeset, block_num).unwrap();
1156            }
1157
1158            writer.commit().unwrap();
1159        }
1160
1161        // Set IndexAccountHistory checkpoint to block 100 (same as sf_tip)
1162        {
1163            let provider = factory.database_provider_rw().unwrap();
1164            provider
1165                .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
1166                .unwrap();
1167            provider.commit().unwrap();
1168        }
1169
1170        let provider = factory.database_provider_ro().unwrap();
1171
1172        // Verify sf_tip equals checkpoint (both at 100)
1173        let sf_tip = provider
1174            .static_file_provider()
1175            .get_highest_static_file_block(StaticFileSegment::AccountChangeSets)
1176            .unwrap();
1177        assert_eq!(sf_tip, 100, "Static file tip should be 100");
1178
1179        // Run check_consistency - should return None (no unwind needed)
1180        let result = rocksdb.check_consistency(&provider).unwrap();
1181        assert_eq!(result, None, "sf_tip == checkpoint should not require unwind");
1182
1183        // Verify NO entries are deleted - RocksDB state unchanged
1184        let entries_after: Vec<_> =
1185            rocksdb.iter::<tables::AccountsHistory>().unwrap().map(|r| r.unwrap()).collect();
1186
1187        assert_eq!(
1188            entries_after.len(),
1189            entries_before.len(),
1190            "RocksDB entry count should be unchanged when sf_tip == checkpoint"
1191        );
1192
1193        // Verify exact entries are preserved
1194        for (before, after) in entries_before.iter().zip(entries_after.iter()) {
1195            assert_eq!(before.0.key, after.0.key, "Entry key should be unchanged");
1196            assert_eq!(
1197                before.0.highest_block_number, after.0.highest_block_number,
1198                "Entry highest_block_number should be unchanged"
1199            );
1200            assert_eq!(before.1, after.1, "Entry block list should be unchanged");
1201        }
1202    }
1203
1204    /// Tests `StoragesHistory` changeset-based healing with enough blocks to trigger batching.
1205    ///
1206    /// Scenario:
1207    /// 1. Generate 15,000 blocks worth of storage changeset data (to exceed the 10k batch size)
1208    /// 2. Each block has 1 storage change (address + slot + value)
1209    /// 3. Write storage changesets to static files for all 15k blocks
1210    /// 4. Set `IndexStorageHistory` checkpoint to block 5000
1211    /// 5. Insert stale `StoragesHistory` entries in `RocksDB` for (address, slot) pairs that
1212    ///    changed in blocks 5001-15000
1213    /// 6. Run `check_consistency`
1214    /// 7. Verify stale entries for blocks > 5000 are pruned and batching worked
1215    #[test]
1216    fn test_check_consistency_storages_history_heals_via_changesets_large_range() {
1217        use alloy_primitives::U256;
1218        use reth_db_api::models::StorageBeforeTx;
1219
1220        const TOTAL_BLOCKS: u64 = 15_000;
1221        const CHECKPOINT_BLOCK: u64 = 5_000;
1222
1223        let temp_dir = TempDir::new().unwrap();
1224        let rocksdb = RocksDBBuilder::new(temp_dir.path())
1225            .with_table::<tables::StoragesHistory>()
1226            .build()
1227            .unwrap();
1228
1229        let factory = create_test_provider_factory();
1230        factory.set_storage_settings_cache(
1231            StorageSettings::legacy()
1232                .with_storages_history_in_rocksdb(true)
1233                .with_storage_changesets_in_static_files(true),
1234        );
1235
1236        // Helper to generate address from block number (reuses stack arrays)
1237        #[inline]
1238        fn make_address(block_num: u64) -> Address {
1239            let mut addr_bytes = [0u8; 20];
1240            addr_bytes[0..8].copy_from_slice(&block_num.to_le_bytes());
1241            Address::from(addr_bytes)
1242        }
1243
1244        // Helper to generate slot from block number (reuses stack arrays)
1245        #[inline]
1246        fn make_slot(block_num: u64) -> B256 {
1247            let mut slot_bytes = [0u8; 32];
1248            slot_bytes[0..8].copy_from_slice(&block_num.to_le_bytes());
1249            B256::from(slot_bytes)
1250        }
1251
1252        // Write storage changesets to static files for 15k blocks.
1253        // Each block has 1 storage change with a unique (address, slot) pair.
1254        {
1255            let sf_provider = factory.static_file_provider();
1256            let mut writer =
1257                sf_provider.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
1258
1259            // Reuse changeset vec to avoid repeated allocations
1260            let mut changeset = Vec::with_capacity(1);
1261
1262            for block_num in 0..TOTAL_BLOCKS {
1263                changeset.clear();
1264                changeset.push(StorageBeforeTx {
1265                    address: make_address(block_num),
1266                    key: make_slot(block_num),
1267                    value: U256::from(block_num),
1268                });
1269
1270                writer.append_storage_changeset(changeset.clone(), block_num).unwrap();
1271            }
1272
1273            writer.commit().unwrap();
1274        }
1275
1276        // Verify static files have data up to block 14999
1277        {
1278            let sf_provider = factory.static_file_provider();
1279            let highest = sf_provider
1280                .get_highest_static_file_block(StaticFileSegment::StorageChangeSets)
1281                .unwrap();
1282            assert_eq!(highest, TOTAL_BLOCKS - 1, "Static files should have blocks 0..14999");
1283        }
1284
1285        // Set IndexStorageHistory checkpoint to block 5000
1286        {
1287            let provider = factory.database_provider_rw().unwrap();
1288            provider
1289                .save_stage_checkpoint(
1290                    StageId::IndexStorageHistory,
1291                    StageCheckpoint::new(CHECKPOINT_BLOCK),
1292                )
1293                .unwrap();
1294            provider.commit().unwrap();
1295        }
1296
1297        // Insert stale StoragesHistory entries for blocks 5001-14999
1298        // These are (address, slot) pairs that changed after the checkpoint
1299        for block_num in (CHECKPOINT_BLOCK + 1)..TOTAL_BLOCKS {
1300            let key =
1301                StorageShardedKey::new(make_address(block_num), make_slot(block_num), block_num);
1302            let block_list = BlockNumberList::new_pre_sorted([block_num]);
1303            rocksdb.put::<tables::StoragesHistory>(key, &block_list).unwrap();
1304        }
1305
1306        // Verify RocksDB has stale entries before healing
1307        let count_before: usize = rocksdb.iter::<tables::StoragesHistory>().unwrap().count();
1308        assert_eq!(
1309            count_before,
1310            (TOTAL_BLOCKS - CHECKPOINT_BLOCK - 1) as usize,
1311            "Should have {} stale entries before healing",
1312            TOTAL_BLOCKS - CHECKPOINT_BLOCK - 1
1313        );
1314
1315        // Run check_consistency - this should heal by pruning stale entries
1316        let provider = factory.database_provider_ro().unwrap();
1317        let result = rocksdb.check_consistency(&provider).unwrap();
1318        assert_eq!(result, None, "Should heal via changesets, no unwind needed");
1319
1320        // Verify all stale entries were pruned
1321        // After healing, entries with highest_block_number > checkpoint should be gone
1322        let mut remaining_stale = 0;
1323        for result in rocksdb.iter::<tables::StoragesHistory>().unwrap() {
1324            let (key, _) = result.unwrap();
1325            if key.sharded_key.highest_block_number > CHECKPOINT_BLOCK {
1326                remaining_stale += 1;
1327            }
1328        }
1329        assert_eq!(
1330            remaining_stale, 0,
1331            "All stale entries (block > {}) should be pruned",
1332            CHECKPOINT_BLOCK
1333        );
1334    }
1335
1336    /// Tests that healing preserves entries at exactly the checkpoint block.
1337    ///
1338    /// This catches off-by-one bugs where checkpoint block data is incorrectly deleted.
1339    #[test]
1340    fn test_check_consistency_storages_history_preserves_checkpoint_block() {
1341        use alloy_primitives::U256;
1342        use reth_db_api::models::StorageBeforeTx;
1343
1344        const CHECKPOINT_BLOCK: u64 = 100;
1345        const SF_TIP: u64 = 200;
1346
1347        let temp_dir = TempDir::new().unwrap();
1348        let rocksdb = RocksDBBuilder::new(temp_dir.path())
1349            .with_table::<tables::StoragesHistory>()
1350            .build()
1351            .unwrap();
1352
1353        let factory = create_test_provider_factory();
1354        factory.set_storage_settings_cache(
1355            StorageSettings::legacy()
1356                .with_storages_history_in_rocksdb(true)
1357                .with_storage_changesets_in_static_files(true),
1358        );
1359
1360        let checkpoint_addr = Address::repeat_byte(0xAA);
1361        let checkpoint_slot = B256::repeat_byte(0xBB);
1362        let stale_addr = Address::repeat_byte(0xCC);
1363        let stale_slot = B256::repeat_byte(0xDD);
1364
1365        // Write storage changesets to static files
1366        {
1367            let sf_provider = factory.static_file_provider();
1368            let mut writer =
1369                sf_provider.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
1370
1371            for block_num in 0..=SF_TIP {
1372                let changeset = if block_num == CHECKPOINT_BLOCK {
1373                    vec![StorageBeforeTx {
1374                        address: checkpoint_addr,
1375                        key: checkpoint_slot,
1376                        value: U256::from(block_num),
1377                    }]
1378                } else if block_num > CHECKPOINT_BLOCK {
1379                    vec![StorageBeforeTx {
1380                        address: stale_addr,
1381                        key: stale_slot,
1382                        value: U256::from(block_num),
1383                    }]
1384                } else {
1385                    vec![StorageBeforeTx {
1386                        address: Address::ZERO,
1387                        key: B256::ZERO,
1388                        value: U256::ZERO,
1389                    }]
1390                };
1391                writer.append_storage_changeset(changeset, block_num).unwrap();
1392            }
1393            writer.commit().unwrap();
1394        }
1395
1396        // Set checkpoint
1397        {
1398            let provider = factory.database_provider_rw().unwrap();
1399            provider
1400                .save_stage_checkpoint(
1401                    StageId::IndexStorageHistory,
1402                    StageCheckpoint::new(CHECKPOINT_BLOCK),
1403                )
1404                .unwrap();
1405            provider.commit().unwrap();
1406        }
1407
1408        // Insert entry AT the checkpoint block (should be preserved)
1409        let checkpoint_key =
1410            StorageShardedKey::new(checkpoint_addr, checkpoint_slot, CHECKPOINT_BLOCK);
1411        let checkpoint_list = BlockNumberList::new_pre_sorted([CHECKPOINT_BLOCK]);
1412        rocksdb.put::<tables::StoragesHistory>(checkpoint_key.clone(), &checkpoint_list).unwrap();
1413
1414        // Insert stale entry AFTER the checkpoint (should be removed)
1415        let stale_key = StorageShardedKey::new(stale_addr, stale_slot, SF_TIP);
1416        let stale_list = BlockNumberList::new_pre_sorted([CHECKPOINT_BLOCK + 1, SF_TIP]);
1417        rocksdb.put::<tables::StoragesHistory>(stale_key.clone(), &stale_list).unwrap();
1418
1419        // Run healing
1420        let provider = factory.database_provider_ro().unwrap();
1421        let result = rocksdb.check_consistency(&provider).unwrap();
1422        assert_eq!(result, None, "Should heal without unwind");
1423
1424        // Verify checkpoint block entry is PRESERVED
1425        let preserved = rocksdb.get::<tables::StoragesHistory>(checkpoint_key).unwrap();
1426        assert!(preserved.is_some(), "Entry at checkpoint block should be preserved, not deleted");
1427
1428        // Verify stale entry is removed or unwound
1429        let stale = rocksdb.get::<tables::StoragesHistory>(stale_key).unwrap();
1430        assert!(stale.is_none(), "Stale entry after checkpoint should be removed");
1431    }
1432
1433    /// Tests `AccountsHistory` changeset-based healing with enough blocks to trigger batching.
1434    ///
1435    /// Scenario:
1436    /// 1. Generate 15,000 blocks worth of account changeset data (to exceed the 10k batch size)
1437    /// 2. Each block has 1 account change (simple - just random addresses)
1438    /// 3. Write account changesets to static files for all 15k blocks
1439    /// 4. Set `IndexAccountHistory` checkpoint to block 5000
1440    /// 5. Insert stale `AccountsHistory` entries in `RocksDB` for addresses that changed in blocks
1441    ///    5001-15000
1442    /// 6. Run `check_consistency`
1443    /// 7. Verify:
1444    ///    - Stale entries for blocks > 5000 are pruned
1445    ///    - The batching worked (no OOM, completed successfully)
1446    #[test]
1447    fn test_check_consistency_accounts_history_heals_via_changesets_large_range() {
1448        use reth_db::models::AccountBeforeTx;
1449        use reth_db_api::models::ShardedKey;
1450        use reth_static_file_types::StaticFileSegment;
1451
1452        let temp_dir = TempDir::new().unwrap();
1453        let rocksdb = RocksDBBuilder::new(temp_dir.path())
1454            .with_table::<tables::AccountsHistory>()
1455            .build()
1456            .unwrap();
1457
1458        // Create test provider factory
1459        let factory = create_test_provider_factory();
1460        factory.set_storage_settings_cache(
1461            StorageSettings::legacy()
1462                .with_account_history_in_rocksdb(true)
1463                .with_account_changesets_in_static_files(true),
1464        );
1465
1466        const TOTAL_BLOCKS: u64 = 15_000;
1467        const CHECKPOINT_BLOCK: u64 = 5_000;
1468
1469        // Helper to generate address from block number (avoids pre-allocating 15k addresses)
1470        #[inline]
1471        fn make_address(block_num: u64) -> Address {
1472            let mut addr = Address::ZERO;
1473            addr.0[0..8].copy_from_slice(&block_num.to_le_bytes());
1474            addr
1475        }
1476
1477        // Write account changesets to static files for all 15k blocks
1478        {
1479            let sf_provider = factory.static_file_provider();
1480            let mut writer =
1481                sf_provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
1482
1483            // Reuse changeset vec to avoid repeated allocations
1484            let mut changeset = Vec::with_capacity(1);
1485
1486            for block_num in 0..TOTAL_BLOCKS {
1487                changeset.clear();
1488                changeset.push(AccountBeforeTx { address: make_address(block_num), info: None });
1489                writer.append_account_changeset(changeset.clone(), block_num).unwrap();
1490            }
1491
1492            writer.commit().unwrap();
1493        }
1494
1495        // Insert stale AccountsHistory entries in RocksDB for addresses that changed
1496        // in blocks 5001-15000 (i.e., blocks after the checkpoint)
1497        // These should be pruned by check_consistency
1498        for block_num in (CHECKPOINT_BLOCK + 1)..TOTAL_BLOCKS {
1499            let key = ShardedKey::new(make_address(block_num), block_num);
1500            let block_list = BlockNumberList::new_pre_sorted([block_num]);
1501            rocksdb.put::<tables::AccountsHistory>(key, &block_list).unwrap();
1502        }
1503
1504        // Also insert some valid entries for blocks <= 5000 that should NOT be pruned
1505        for block_num in [100u64, 500, 1000, 2500, 5000] {
1506            let key = ShardedKey::new(make_address(block_num), block_num);
1507            let block_list = BlockNumberList::new_pre_sorted([block_num]);
1508            rocksdb.put::<tables::AccountsHistory>(key, &block_list).unwrap();
1509        }
1510
1511        // Verify we have entries before healing
1512        let entries_before: usize = rocksdb.iter::<tables::AccountsHistory>().unwrap().count();
1513        let stale_count = (TOTAL_BLOCKS - CHECKPOINT_BLOCK - 1) as usize;
1514        let valid_count = 5usize;
1515        assert_eq!(
1516            entries_before,
1517            stale_count + valid_count,
1518            "Should have {} stale + {} valid entries before healing",
1519            stale_count,
1520            valid_count
1521        );
1522
1523        // Set IndexAccountHistory checkpoint to block 5000
1524        {
1525            let provider = factory.database_provider_rw().unwrap();
1526            provider
1527                .save_stage_checkpoint(
1528                    StageId::IndexAccountHistory,
1529                    StageCheckpoint::new(CHECKPOINT_BLOCK),
1530                )
1531                .unwrap();
1532            provider.commit().unwrap();
1533        }
1534
1535        let provider = factory.database_provider_ro().unwrap();
1536
1537        // Verify sf_tip > checkpoint
1538        let sf_tip = provider
1539            .static_file_provider()
1540            .get_highest_static_file_block(StaticFileSegment::AccountChangeSets)
1541            .unwrap();
1542        assert_eq!(sf_tip, TOTAL_BLOCKS - 1, "Static file tip should be 14999");
1543        assert!(sf_tip > CHECKPOINT_BLOCK, "sf_tip should be > checkpoint to trigger healing");
1544
1545        // Run check_consistency - this should trigger batched changeset-based healing
1546        let result = rocksdb.check_consistency(&provider).unwrap();
1547        assert_eq!(result, None, "Healing should succeed without requiring unwind");
1548
1549        // Verify: all stale entries for blocks > 5000 should be pruned
1550        // Count remaining entries with highest_block_number > checkpoint
1551        let mut remaining_stale = 0;
1552        for result in rocksdb.iter::<tables::AccountsHistory>().unwrap() {
1553            let (key, _) = result.unwrap();
1554            if key.highest_block_number > CHECKPOINT_BLOCK && key.highest_block_number != u64::MAX {
1555                remaining_stale += 1;
1556            }
1557        }
1558        assert_eq!(
1559            remaining_stale, 0,
1560            "All stale entries (block > {}) should be pruned",
1561            CHECKPOINT_BLOCK
1562        );
1563    }
1564
1565    /// Tests that accounts history healing preserves entries at exactly the checkpoint block.
1566    #[test]
1567    fn test_check_consistency_accounts_history_preserves_checkpoint_block() {
1568        use reth_db::models::AccountBeforeTx;
1569        use reth_db_api::models::ShardedKey;
1570
1571        const CHECKPOINT_BLOCK: u64 = 100;
1572        const SF_TIP: u64 = 200;
1573
1574        let temp_dir = TempDir::new().unwrap();
1575        let rocksdb = RocksDBBuilder::new(temp_dir.path())
1576            .with_table::<tables::AccountsHistory>()
1577            .build()
1578            .unwrap();
1579
1580        let factory = create_test_provider_factory();
1581        factory.set_storage_settings_cache(
1582            StorageSettings::legacy()
1583                .with_account_history_in_rocksdb(true)
1584                .with_account_changesets_in_static_files(true),
1585        );
1586
1587        let checkpoint_addr = Address::repeat_byte(0xAA);
1588        let stale_addr = Address::repeat_byte(0xCC);
1589
1590        // Write account changesets to static files
1591        {
1592            let sf_provider = factory.static_file_provider();
1593            let mut writer =
1594                sf_provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
1595
1596            for block_num in 0..=SF_TIP {
1597                let changeset = if block_num == CHECKPOINT_BLOCK {
1598                    vec![AccountBeforeTx { address: checkpoint_addr, info: None }]
1599                } else if block_num > CHECKPOINT_BLOCK {
1600                    vec![AccountBeforeTx { address: stale_addr, info: None }]
1601                } else {
1602                    vec![AccountBeforeTx { address: Address::ZERO, info: None }]
1603                };
1604                writer.append_account_changeset(changeset, block_num).unwrap();
1605            }
1606            writer.commit().unwrap();
1607        }
1608
1609        // Set checkpoint
1610        {
1611            let provider = factory.database_provider_rw().unwrap();
1612            provider
1613                .save_stage_checkpoint(
1614                    StageId::IndexAccountHistory,
1615                    StageCheckpoint::new(CHECKPOINT_BLOCK),
1616                )
1617                .unwrap();
1618            provider.commit().unwrap();
1619        }
1620
1621        // Insert entry AT the checkpoint block (should be preserved)
1622        let checkpoint_key = ShardedKey::new(checkpoint_addr, CHECKPOINT_BLOCK);
1623        let checkpoint_list = BlockNumberList::new_pre_sorted([CHECKPOINT_BLOCK]);
1624        rocksdb.put::<tables::AccountsHistory>(checkpoint_key.clone(), &checkpoint_list).unwrap();
1625
1626        // Insert stale entry AFTER the checkpoint (should be removed)
1627        let stale_key = ShardedKey::new(stale_addr, SF_TIP);
1628        let stale_list = BlockNumberList::new_pre_sorted([CHECKPOINT_BLOCK + 1, SF_TIP]);
1629        rocksdb.put::<tables::AccountsHistory>(stale_key.clone(), &stale_list).unwrap();
1630
1631        // Run healing
1632        let provider = factory.database_provider_ro().unwrap();
1633        let result = rocksdb.check_consistency(&provider).unwrap();
1634        assert_eq!(result, None, "Should heal without unwind");
1635
1636        // Verify checkpoint block entry is PRESERVED
1637        let preserved = rocksdb.get::<tables::AccountsHistory>(checkpoint_key).unwrap();
1638        assert!(preserved.is_some(), "Entry at checkpoint block should be preserved, not deleted");
1639
1640        // Verify stale entry is removed or unwound
1641        let stale = rocksdb.get::<tables::AccountsHistory>(stale_key).unwrap();
1642        assert!(stale.is_none(), "Stale entry after checkpoint should be removed");
1643    }
1644
1645    #[test]
1646    fn test_check_consistency_storages_history_sf_tip_equals_checkpoint_no_action() {
1647        use alloy_primitives::U256;
1648        use reth_db::models::StorageBeforeTx;
1649        use reth_static_file_types::StaticFileSegment;
1650
1651        let temp_dir = TempDir::new().unwrap();
1652        let rocksdb = RocksDBBuilder::new(temp_dir.path())
1653            .with_table::<tables::StoragesHistory>()
1654            .build()
1655            .unwrap();
1656
1657        // Insert StoragesHistory entries into RocksDB
1658        let key1 = StorageShardedKey::new(Address::ZERO, B256::ZERO, 50);
1659        let key2 = StorageShardedKey::new(Address::random(), B256::random(), 80);
1660        let block_list1 = BlockNumberList::new_pre_sorted([10, 20, 30, 50]);
1661        let block_list2 = BlockNumberList::new_pre_sorted([40, 60, 80]);
1662        rocksdb.put::<tables::StoragesHistory>(key1, &block_list1).unwrap();
1663        rocksdb.put::<tables::StoragesHistory>(key2, &block_list2).unwrap();
1664
1665        // Capture entries before consistency check
1666        let entries_before: Vec<_> =
1667            rocksdb.iter::<tables::StoragesHistory>().unwrap().map(|r| r.unwrap()).collect();
1668
1669        // Create a test provider factory
1670        let factory = create_test_provider_factory();
1671        factory.set_storage_settings_cache(
1672            StorageSettings::legacy().with_storages_history_in_rocksdb(true),
1673        );
1674
1675        // Write storage changesets to static files for blocks 0-100
1676        {
1677            let sf_provider = factory.static_file_provider();
1678            let mut writer =
1679                sf_provider.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
1680
1681            for block_num in 0..=100u64 {
1682                let changeset = vec![StorageBeforeTx {
1683                    address: Address::ZERO,
1684                    key: B256::with_last_byte(block_num as u8),
1685                    value: U256::from(block_num),
1686                }];
1687                writer.append_storage_changeset(changeset, block_num).unwrap();
1688            }
1689            writer.commit().unwrap();
1690        }
1691
1692        // Set IndexStorageHistory checkpoint to block 100 (same as sf_tip)
1693        {
1694            let provider = factory.database_provider_rw().unwrap();
1695            provider
1696                .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
1697                .unwrap();
1698            provider.commit().unwrap();
1699        }
1700
1701        let provider = factory.database_provider_ro().unwrap();
1702
1703        // Verify sf_tip equals checkpoint (both at 100)
1704        let sf_tip = provider
1705            .static_file_provider()
1706            .get_highest_static_file_block(StaticFileSegment::StorageChangeSets)
1707            .unwrap();
1708        assert_eq!(sf_tip, 100, "Static file tip should be 100");
1709
1710        // Run check_consistency - should return None (no unwind needed)
1711        let result = rocksdb.check_consistency(&provider).unwrap();
1712        assert_eq!(result, None, "sf_tip == checkpoint should not require unwind");
1713
1714        // Verify NO entries are deleted - RocksDB state unchanged
1715        let entries_after: Vec<_> =
1716            rocksdb.iter::<tables::StoragesHistory>().unwrap().map(|r| r.unwrap()).collect();
1717
1718        assert_eq!(
1719            entries_after.len(),
1720            entries_before.len(),
1721            "RocksDB entry count should be unchanged when sf_tip == checkpoint"
1722        );
1723
1724        // Verify exact entries are preserved
1725        for (before, after) in entries_before.iter().zip(entries_after.iter()) {
1726            assert_eq!(before.0, after.0, "Entry key should be unchanged");
1727            assert_eq!(before.1, after.1, "Entry block list should be unchanged");
1728        }
1729    }
1730}