reth_provider/providers/rocksdb/
invariants.rs

1//! Invariant checking for `RocksDB` tables.
2//!
3//! This module provides consistency checks for tables stored in `RocksDB`, similar to the
4//! consistency checks for static files. The goal is to detect and potentially heal
5//! inconsistencies between `RocksDB` data and MDBX checkpoints.
6
7use super::RocksDBProvider;
8use crate::StaticFileProviderFactory;
9use alloy_eips::eip2718::Encodable2718;
10use alloy_primitives::BlockNumber;
11use rayon::prelude::*;
12use reth_db::cursor::DbCursorRO;
13use reth_db_api::{tables, transaction::DbTx};
14use reth_stages_types::StageId;
15use reth_static_file_types::StaticFileSegment;
16use reth_storage_api::{
17    DBProvider, StageCheckpointReader, StorageSettingsCache, TransactionsProvider,
18};
19use reth_storage_errors::provider::ProviderResult;
20
21impl RocksDBProvider {
22    /// Checks consistency of `RocksDB` tables against MDBX stage checkpoints.
23    ///
24    /// Returns an unwind target block number if the pipeline needs to unwind to rebuild
25    /// `RocksDB` data. Returns `None` if all invariants pass or if inconsistencies were healed.
26    ///
27    /// # Invariants checked
28    ///
29    /// For `TransactionHashNumbers`:
30    /// - The maximum `TxNumber` value should not exceed what the `TransactionLookup` stage
31    ///   checkpoint indicates has been processed.
32    /// - If `RocksDB` is ahead, excess entries are pruned (healed).
33    /// - If `RocksDB` is behind, an unwind is required.
34    ///
35    /// For `StoragesHistory`:
36    /// - The maximum block number in shards should not exceed the `IndexStorageHistory` stage
37    ///   checkpoint.
38    /// - Similar healing/unwind logic applies.
39    ///
40    /// # Requirements
41    ///
42    /// For pruning `TransactionHashNumbers`, the provider must be able to supply transaction
43    /// data (typically from static files) so that transaction hashes can be computed. This
44    /// implies that static files should be ahead of or in sync with `RocksDB`.
45    pub fn check_consistency<Provider>(
46        &self,
47        provider: &Provider,
48    ) -> ProviderResult<Option<BlockNumber>>
49    where
50        Provider: DBProvider
51            + StageCheckpointReader
52            + StorageSettingsCache
53            + StaticFileProviderFactory
54            + TransactionsProvider<Transaction: Encodable2718>,
55    {
56        let mut unwind_target: Option<BlockNumber> = None;
57
58        // Check TransactionHashNumbers if stored in RocksDB
59        if provider.cached_storage_settings().transaction_hash_numbers_in_rocksdb &&
60            let Some(target) = self.check_transaction_hash_numbers(provider)?
61        {
62            unwind_target = Some(unwind_target.map_or(target, |t| t.min(target)));
63        }
64
65        // Check StoragesHistory if stored in RocksDB
66        if provider.cached_storage_settings().storages_history_in_rocksdb &&
67            let Some(target) = self.check_storages_history(provider)?
68        {
69            unwind_target = Some(unwind_target.map_or(target, |t| t.min(target)));
70        }
71
72        // Check AccountsHistory if stored in RocksDB
73        if provider.cached_storage_settings().account_history_in_rocksdb &&
74            let Some(target) = self.check_accounts_history(provider)?
75        {
76            unwind_target = Some(unwind_target.map_or(target, |t| t.min(target)));
77        }
78
79        Ok(unwind_target)
80    }
81
82    /// Checks invariants for the `TransactionHashNumbers` table.
83    ///
84    /// Returns a block number to unwind to if MDBX is behind the checkpoint.
85    /// If static files are ahead of MDBX, excess `RocksDB` entries are pruned (healed).
86    ///
87    /// # Approach
88    ///
89    /// Instead of iterating `RocksDB` entries (which is expensive and doesn't give us the
90    /// tx range we need), we use static files and MDBX to determine what needs pruning:
91    /// - Static files are committed before `RocksDB`, so they're at least at the same height
92    /// - MDBX `TransactionBlocks` tells us what's been fully committed
93    /// - If static files have more transactions than MDBX, prune the excess range
94    fn check_transaction_hash_numbers<Provider>(
95        &self,
96        provider: &Provider,
97    ) -> ProviderResult<Option<BlockNumber>>
98    where
99        Provider: DBProvider
100            + StageCheckpointReader
101            + StaticFileProviderFactory
102            + TransactionsProvider<Transaction: Encodable2718>,
103    {
104        // Get the TransactionLookup stage checkpoint
105        let checkpoint = provider
106            .get_stage_checkpoint(StageId::TransactionLookup)?
107            .map(|cp| cp.block_number)
108            .unwrap_or(0);
109
110        // Get last tx_num from MDBX - this tells us what MDBX has fully committed
111        let mut cursor = provider.tx_ref().cursor_read::<tables::TransactionBlocks>()?;
112        let mdbx_last = cursor.last()?;
113
114        // Get highest tx_num from static files - this tells us what tx data is available
115        let highest_static_tx = provider
116            .static_file_provider()
117            .get_highest_static_file_tx(StaticFileSegment::Transactions);
118
119        match (mdbx_last, highest_static_tx) {
120            (Some((mdbx_tx, mdbx_block)), Some(highest_tx)) if highest_tx > mdbx_tx => {
121                // Static files are ahead of MDBX - prune RocksDB entries for the excess range.
122                // This is the common case during recovery from a crash during unwinding.
123                tracing::info!(
124                    target: "reth::providers::rocksdb",
125                    mdbx_last_tx = mdbx_tx,
126                    mdbx_block,
127                    highest_static_tx = highest_tx,
128                    "Static files ahead of MDBX, pruning TransactionHashNumbers excess data"
129                );
130                self.prune_transaction_hash_numbers_in_range(provider, (mdbx_tx + 1)..=highest_tx)?;
131
132                // After pruning, check if MDBX is behind checkpoint
133                if checkpoint > mdbx_block {
134                    tracing::warn!(
135                        target: "reth::providers::rocksdb",
136                        mdbx_block,
137                        checkpoint,
138                        "MDBX behind checkpoint after pruning, unwind needed"
139                    );
140                    return Ok(Some(mdbx_block));
141                }
142            }
143            (Some((_mdbx_tx, mdbx_block)), _) => {
144                // MDBX and static files are in sync (or static files don't have more data).
145                // Check if MDBX is behind checkpoint.
146                if checkpoint > mdbx_block {
147                    tracing::warn!(
148                        target: "reth::providers::rocksdb",
149                        mdbx_block,
150                        checkpoint,
151                        "MDBX behind checkpoint, unwind needed"
152                    );
153                    return Ok(Some(mdbx_block));
154                }
155            }
156            (None, Some(highest_tx)) => {
157                // MDBX has no transactions but static files have data.
158                // This means RocksDB might have stale entries - prune them all.
159                tracing::info!(
160                    target: "reth::providers::rocksdb",
161                    highest_static_tx = highest_tx,
162                    "MDBX empty but static files have data, pruning all TransactionHashNumbers"
163                );
164                self.prune_transaction_hash_numbers_in_range(provider, 0..=highest_tx)?;
165            }
166            (None, None) => {
167                // Both MDBX and static files are empty.
168                // If checkpoint says we should have data, that's an inconsistency.
169                if checkpoint > 0 {
170                    tracing::warn!(
171                        target: "reth::providers::rocksdb",
172                        checkpoint,
173                        "Checkpoint set but no transaction data exists, unwind needed"
174                    );
175                    return Ok(Some(0));
176                }
177            }
178        }
179
180        Ok(None)
181    }
182
183    /// Prunes `TransactionHashNumbers` entries for transactions in the given range.
184    ///
185    /// This fetches transactions from the provider, computes their hashes in parallel,
186    /// and deletes the corresponding entries from `RocksDB` by key. This approach is more
187    /// scalable than iterating all rows because it only processes the transactions that
188    /// need to be pruned.
189    ///
190    /// # Requirements
191    ///
192    /// The provider must be able to supply transaction data (typically from static files)
193    /// so that transaction hashes can be computed. This implies that static files should
194    /// be ahead of or in sync with `RocksDB`.
195    fn prune_transaction_hash_numbers_in_range<Provider>(
196        &self,
197        provider: &Provider,
198        tx_range: std::ops::RangeInclusive<u64>,
199    ) -> ProviderResult<()>
200    where
201        Provider: TransactionsProvider<Transaction: Encodable2718>,
202    {
203        if tx_range.is_empty() {
204            return Ok(());
205        }
206
207        // Fetch transactions in the range and compute their hashes in parallel
208        let hashes: Vec<_> = provider
209            .transactions_by_tx_range(tx_range.clone())?
210            .into_par_iter()
211            .map(|tx| tx.trie_hash())
212            .collect();
213
214        if !hashes.is_empty() {
215            tracing::info!(
216                target: "reth::providers::rocksdb",
217                deleted_count = hashes.len(),
218                tx_range_start = *tx_range.start(),
219                tx_range_end = *tx_range.end(),
220                "Pruning TransactionHashNumbers entries by tx range"
221            );
222
223            let mut batch = self.batch();
224            for hash in hashes {
225                batch.delete::<tables::TransactionHashNumbers>(hash)?;
226            }
227            batch.commit()?;
228        }
229
230        Ok(())
231    }
232
233    /// Checks invariants for the `StoragesHistory` table.
234    ///
235    /// Returns a block number to unwind to if `RocksDB` is behind the checkpoint.
236    /// If `RocksDB` is ahead of the checkpoint, excess entries are pruned (healed).
237    fn check_storages_history<Provider>(
238        &self,
239        provider: &Provider,
240    ) -> ProviderResult<Option<BlockNumber>>
241    where
242        Provider: DBProvider + StageCheckpointReader,
243    {
244        // Get the IndexStorageHistory stage checkpoint
245        let checkpoint = provider
246            .get_stage_checkpoint(StageId::IndexStorageHistory)?
247            .map(|cp| cp.block_number)
248            .unwrap_or(0);
249
250        // Check if RocksDB has any data
251        let rocks_first = self.first::<tables::StoragesHistory>()?;
252
253        match rocks_first {
254            Some(_) => {
255                // If checkpoint is 0 but we have data, clear everything
256                if checkpoint == 0 {
257                    tracing::info!(
258                        target: "reth::providers::rocksdb",
259                        "StoragesHistory has data but checkpoint is 0, clearing all"
260                    );
261                    self.prune_storages_history_above(0)?;
262                    return Ok(None);
263                }
264
265                // Find the max highest_block_number (excluding u64::MAX sentinel) across all
266                // entries
267                let mut max_highest_block = 0u64;
268                for result in self.iter::<tables::StoragesHistory>()? {
269                    let (key, _) = result?;
270                    let highest = key.sharded_key.highest_block_number;
271                    if highest != u64::MAX && highest > max_highest_block {
272                        max_highest_block = highest;
273                    }
274                }
275
276                // If any entry has highest_block > checkpoint, prune excess
277                if max_highest_block > checkpoint {
278                    tracing::info!(
279                        target: "reth::providers::rocksdb",
280                        rocks_highest = max_highest_block,
281                        checkpoint,
282                        "StoragesHistory ahead of checkpoint, pruning excess data"
283                    );
284                    self.prune_storages_history_above(checkpoint)?;
285                } else if max_highest_block < checkpoint {
286                    // RocksDB is behind checkpoint, return highest block to signal unwind needed
287                    tracing::warn!(
288                        target: "reth::providers::rocksdb",
289                        rocks_highest = max_highest_block,
290                        checkpoint,
291                        "StoragesHistory behind checkpoint, unwind needed"
292                    );
293                    return Ok(Some(max_highest_block));
294                }
295
296                Ok(None)
297            }
298            None => {
299                // Empty RocksDB table
300                if checkpoint > 0 {
301                    // Stage says we should have data but we don't
302                    return Ok(Some(0));
303                }
304                Ok(None)
305            }
306        }
307    }
308
309    /// Prunes `StoragesHistory` entries where `highest_block_number` > `max_block`.
310    ///
311    /// For `StoragesHistory`, the key contains `highest_block_number`, so we can iterate
312    /// and delete entries where `key.sharded_key.highest_block_number > max_block`.
313    ///
314    /// TODO(<https://github.com/paradigmxyz/reth/issues/20417>): this iterates the whole table,
315    /// which is inefficient. Use changeset-based pruning instead.
316    fn prune_storages_history_above(&self, max_block: BlockNumber) -> ProviderResult<()> {
317        use reth_db_api::models::storage_sharded_key::StorageShardedKey;
318
319        let mut to_delete: Vec<StorageShardedKey> = Vec::new();
320        for result in self.iter::<tables::StoragesHistory>()? {
321            let (key, _) = result?;
322            let highest_block = key.sharded_key.highest_block_number;
323            if max_block == 0 || (highest_block != u64::MAX && highest_block > max_block) {
324                to_delete.push(key);
325            }
326        }
327
328        let deleted = to_delete.len();
329        if deleted > 0 {
330            tracing::info!(
331                target: "reth::providers::rocksdb",
332                deleted_count = deleted,
333                max_block,
334                "Pruning StoragesHistory entries"
335            );
336
337            let mut batch = self.batch();
338            for key in to_delete {
339                batch.delete::<tables::StoragesHistory>(key)?;
340            }
341            batch.commit()?;
342        }
343
344        Ok(())
345    }
346
347    /// Checks invariants for the `AccountsHistory` table.
348    ///
349    /// Returns a block number to unwind to if `RocksDB` is behind the checkpoint.
350    /// If `RocksDB` is ahead of the checkpoint, excess entries are pruned (healed).
351    fn check_accounts_history<Provider>(
352        &self,
353        provider: &Provider,
354    ) -> ProviderResult<Option<BlockNumber>>
355    where
356        Provider: DBProvider + StageCheckpointReader,
357    {
358        // Get the IndexAccountHistory stage checkpoint
359        let checkpoint = provider
360            .get_stage_checkpoint(StageId::IndexAccountHistory)?
361            .map(|cp| cp.block_number)
362            .unwrap_or(0);
363
364        // Check if RocksDB has any data
365        let rocks_first = self.first::<tables::AccountsHistory>()?;
366
367        match rocks_first {
368            Some(_) => {
369                // If checkpoint is 0 but we have data, clear everything
370                if checkpoint == 0 {
371                    tracing::info!(
372                        target: "reth::providers::rocksdb",
373                        "AccountsHistory has data but checkpoint is 0, clearing all"
374                    );
375                    self.prune_accounts_history_above(0)?;
376                    return Ok(None);
377                }
378
379                // Find the max highest_block_number (excluding u64::MAX sentinel) across all
380                // entries
381                let mut max_highest_block = 0u64;
382                for result in self.iter::<tables::AccountsHistory>()? {
383                    let (key, _) = result?;
384                    let highest = key.highest_block_number;
385                    if highest != u64::MAX && highest > max_highest_block {
386                        max_highest_block = highest;
387                    }
388                }
389
390                // If any entry has highest_block > checkpoint, prune excess
391                if max_highest_block > checkpoint {
392                    tracing::info!(
393                        target: "reth::providers::rocksdb",
394                        rocks_highest = max_highest_block,
395                        checkpoint,
396                        "AccountsHistory ahead of checkpoint, pruning excess data"
397                    );
398                    self.prune_accounts_history_above(checkpoint)?;
399                    return Ok(None);
400                }
401
402                // If RocksDB is behind the checkpoint, request an unwind to rebuild.
403                if max_highest_block < checkpoint {
404                    tracing::warn!(
405                        target: "reth::providers::rocksdb",
406                        rocks_highest = max_highest_block,
407                        checkpoint,
408                        "AccountsHistory behind checkpoint, unwind needed"
409                    );
410                    return Ok(Some(max_highest_block));
411                }
412
413                Ok(None)
414            }
415            None => {
416                // Empty RocksDB table
417                if checkpoint > 0 {
418                    // Stage says we should have data but we don't
419                    return Ok(Some(0));
420                }
421                Ok(None)
422            }
423        }
424    }
425
426    /// Prunes `AccountsHistory` entries where `highest_block_number` > `max_block`.
427    ///
428    /// For `AccountsHistory`, the key is `ShardedKey<Address>` which contains
429    /// `highest_block_number`, so we can iterate and delete entries where
430    /// `key.highest_block_number > max_block`.
431    ///
432    /// TODO(<https://github.com/paradigmxyz/reth/issues/20417>): this iterates the whole table,
433    /// which is inefficient. Use changeset-based pruning instead.
434    fn prune_accounts_history_above(&self, max_block: BlockNumber) -> ProviderResult<()> {
435        use alloy_primitives::Address;
436        use reth_db_api::models::ShardedKey;
437
438        let mut to_delete: Vec<ShardedKey<Address>> = Vec::new();
439        for result in self.iter::<tables::AccountsHistory>()? {
440            let (key, _) = result?;
441            let highest_block = key.highest_block_number;
442            if max_block == 0 || (highest_block != u64::MAX && highest_block > max_block) {
443                to_delete.push(key);
444            }
445        }
446
447        let deleted = to_delete.len();
448        if deleted > 0 {
449            tracing::info!(
450                target: "reth::providers::rocksdb",
451                deleted_count = deleted,
452                max_block,
453                "Pruning AccountsHistory entries"
454            );
455
456            let mut batch = self.batch();
457            for key in to_delete {
458                batch.delete::<tables::AccountsHistory>(key)?;
459            }
460            batch.commit()?;
461        }
462
463        Ok(())
464    }
465}
466
467#[cfg(test)]
468mod tests {
469    use super::*;
470    use crate::{
471        providers::rocksdb::RocksDBBuilder, test_utils::create_test_provider_factory, BlockWriter,
472        DatabaseProviderFactory, StageCheckpointWriter, TransactionsProvider,
473    };
474    use alloy_primitives::{Address, B256};
475    use reth_db::cursor::DbCursorRW;
476    use reth_db_api::{
477        models::{storage_sharded_key::StorageShardedKey, StorageSettings},
478        tables::{self, BlockNumberList},
479        transaction::DbTxMut,
480    };
481    use reth_stages_types::StageCheckpoint;
482    use reth_testing_utils::generators::{self, BlockRangeParams};
483    use tempfile::TempDir;
484
485    #[test]
486    fn test_first_last_empty_rocksdb() {
487        let temp_dir = TempDir::new().unwrap();
488        let provider = RocksDBBuilder::new(temp_dir.path())
489            .with_table::<tables::TransactionHashNumbers>()
490            .with_table::<tables::StoragesHistory>()
491            .build()
492            .unwrap();
493
494        // Empty RocksDB, no checkpoints - should be consistent
495        let first = provider.first::<tables::TransactionHashNumbers>().unwrap();
496        let last = provider.last::<tables::TransactionHashNumbers>().unwrap();
497
498        assert!(first.is_none());
499        assert!(last.is_none());
500    }
501
502    #[test]
503    fn test_first_last_with_data() {
504        let temp_dir = TempDir::new().unwrap();
505        let provider = RocksDBBuilder::new(temp_dir.path())
506            .with_table::<tables::TransactionHashNumbers>()
507            .build()
508            .unwrap();
509
510        // Insert some data
511        let tx_hash = B256::from([1u8; 32]);
512        provider.put::<tables::TransactionHashNumbers>(tx_hash, &100).unwrap();
513
514        // RocksDB has data
515        let last = provider.last::<tables::TransactionHashNumbers>().unwrap();
516        assert!(last.is_some());
517        assert_eq!(last.unwrap().1, 100);
518    }
519
520    #[test]
521    fn test_check_consistency_empty_rocksdb_no_checkpoint_is_ok() {
522        let temp_dir = TempDir::new().unwrap();
523        let rocksdb = RocksDBBuilder::new(temp_dir.path())
524            .with_table::<tables::TransactionHashNumbers>()
525            .with_table::<tables::StoragesHistory>()
526            .build()
527            .unwrap();
528
529        // Create a test provider factory for MDBX
530        let factory = create_test_provider_factory();
531        factory.set_storage_settings_cache(
532            StorageSettings::legacy()
533                .with_transaction_hash_numbers_in_rocksdb(true)
534                .with_storages_history_in_rocksdb(true),
535        );
536
537        let provider = factory.database_provider_ro().unwrap();
538
539        // Empty RocksDB and no checkpoints - should be consistent (None = no unwind needed)
540        let result = rocksdb.check_consistency(&provider).unwrap();
541        assert_eq!(result, None);
542    }
543
544    #[test]
545    fn test_check_consistency_empty_rocksdb_with_checkpoint_needs_unwind() {
546        let temp_dir = TempDir::new().unwrap();
547        let rocksdb = RocksDBBuilder::new(temp_dir.path())
548            .with_table::<tables::TransactionHashNumbers>()
549            .build()
550            .unwrap();
551
552        // Create a test provider factory for MDBX
553        let factory = create_test_provider_factory();
554        factory.set_storage_settings_cache(
555            StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
556        );
557
558        // Set a checkpoint indicating we should have processed up to block 100
559        {
560            let provider = factory.database_provider_rw().unwrap();
561            provider
562                .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(100))
563                .unwrap();
564            provider.commit().unwrap();
565        }
566
567        let provider = factory.database_provider_ro().unwrap();
568
569        // RocksDB is empty but checkpoint says block 100 was processed
570        // This means RocksDB is missing data and we need to unwind to rebuild
571        let result = rocksdb.check_consistency(&provider).unwrap();
572        assert_eq!(result, Some(0), "Should require unwind to block 0 to rebuild RocksDB");
573    }
574
575    #[test]
576    fn test_check_consistency_mdbx_empty_static_files_have_data_prunes_rocksdb() {
577        let temp_dir = TempDir::new().unwrap();
578        let rocksdb = RocksDBBuilder::new(temp_dir.path())
579            .with_table::<tables::TransactionHashNumbers>()
580            .build()
581            .unwrap();
582
583        let factory = create_test_provider_factory();
584        factory.set_storage_settings_cache(
585            StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
586        );
587
588        // Generate blocks with real transactions and insert them
589        let mut rng = generators::rng();
590        let blocks = generators::random_block_range(
591            &mut rng,
592            0..=2,
593            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
594        );
595
596        let mut tx_hashes = Vec::new();
597        {
598            let provider = factory.database_provider_rw().unwrap();
599            let mut tx_count = 0u64;
600            for block in &blocks {
601                provider
602                    .insert_block(&block.clone().try_recover().expect("recover block"))
603                    .unwrap();
604                for tx in &block.body().transactions {
605                    let hash = tx.trie_hash();
606                    tx_hashes.push(hash);
607                    rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
608                    tx_count += 1;
609                }
610            }
611            provider.commit().unwrap();
612        }
613
614        // Simulate crash recovery: MDBX was reset but static files and RocksDB still have data.
615        // Clear TransactionBlocks to simulate empty MDBX state.
616        {
617            let provider = factory.database_provider_rw().unwrap();
618            let mut cursor = provider.tx_ref().cursor_write::<tables::TransactionBlocks>().unwrap();
619            let mut to_delete = Vec::new();
620            let mut walker = cursor.walk(Some(0)).unwrap();
621            while let Some((tx_num, _)) = walker.next().transpose().unwrap() {
622                to_delete.push(tx_num);
623            }
624            drop(walker);
625            for tx_num in to_delete {
626                cursor.seek_exact(tx_num).unwrap();
627                cursor.delete_current().unwrap();
628            }
629            // No checkpoint set (checkpoint = 0)
630            provider.commit().unwrap();
631        }
632
633        // Verify RocksDB data exists
634        assert!(rocksdb.last::<tables::TransactionHashNumbers>().unwrap().is_some());
635
636        let provider = factory.database_provider_ro().unwrap();
637
638        // MDBX TransactionBlocks is empty, but static files have transaction data.
639        // This means RocksDB has stale data that should be pruned (healed).
640        let result = rocksdb.check_consistency(&provider).unwrap();
641        assert_eq!(result, None, "Should heal by pruning, no unwind needed");
642
643        // Verify data was pruned
644        for hash in &tx_hashes {
645            assert!(
646                rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap().is_none(),
647                "RocksDB should be empty after pruning"
648            );
649        }
650    }
651
652    #[test]
653    fn test_check_consistency_storages_history_empty_with_checkpoint_needs_unwind() {
654        let temp_dir = TempDir::new().unwrap();
655        let rocksdb = RocksDBBuilder::new(temp_dir.path())
656            .with_table::<tables::StoragesHistory>()
657            .build()
658            .unwrap();
659
660        // Create a test provider factory for MDBX
661        let factory = create_test_provider_factory();
662        factory.set_storage_settings_cache(
663            StorageSettings::legacy().with_storages_history_in_rocksdb(true),
664        );
665
666        // Set a checkpoint indicating we should have processed up to block 100
667        {
668            let provider = factory.database_provider_rw().unwrap();
669            provider
670                .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
671                .unwrap();
672            provider.commit().unwrap();
673        }
674
675        let provider = factory.database_provider_ro().unwrap();
676
677        // RocksDB is empty but checkpoint says block 100 was processed
678        let result = rocksdb.check_consistency(&provider).unwrap();
679        assert_eq!(result, Some(0), "Should require unwind to block 0 to rebuild StoragesHistory");
680    }
681
682    #[test]
683    fn test_check_consistency_storages_history_has_data_no_checkpoint_prunes_data() {
684        let temp_dir = TempDir::new().unwrap();
685        let rocksdb = RocksDBBuilder::new(temp_dir.path())
686            .with_table::<tables::StoragesHistory>()
687            .build()
688            .unwrap();
689
690        // Insert data into RocksDB
691        let key = StorageShardedKey::new(Address::ZERO, B256::ZERO, 50);
692        let block_list = BlockNumberList::new_pre_sorted([10, 20, 30, 50]);
693        rocksdb.put::<tables::StoragesHistory>(key, &block_list).unwrap();
694
695        // Verify data exists
696        assert!(rocksdb.last::<tables::StoragesHistory>().unwrap().is_some());
697
698        // Create a test provider factory for MDBX with NO checkpoint
699        let factory = create_test_provider_factory();
700        factory.set_storage_settings_cache(
701            StorageSettings::legacy().with_storages_history_in_rocksdb(true),
702        );
703
704        let provider = factory.database_provider_ro().unwrap();
705
706        // RocksDB has data but checkpoint is 0
707        // This means RocksDB has stale data that should be pruned (healed)
708        let result = rocksdb.check_consistency(&provider).unwrap();
709        assert_eq!(result, None, "Should heal by pruning, no unwind needed");
710
711        // Verify data was pruned
712        assert!(
713            rocksdb.last::<tables::StoragesHistory>().unwrap().is_none(),
714            "RocksDB should be empty after pruning"
715        );
716    }
717
718    #[test]
719    fn test_check_consistency_storages_history_behind_checkpoint_needs_unwind() {
720        let temp_dir = TempDir::new().unwrap();
721        let rocksdb = RocksDBBuilder::new(temp_dir.path())
722            .with_table::<tables::StoragesHistory>()
723            .build()
724            .unwrap();
725
726        // Insert data into RocksDB with max highest_block_number = 80
727        let key_block_50 = StorageShardedKey::new(Address::ZERO, B256::ZERO, 50);
728        let key_block_80 = StorageShardedKey::new(Address::ZERO, B256::from([1u8; 32]), 80);
729        let key_block_max = StorageShardedKey::new(Address::ZERO, B256::from([2u8; 32]), u64::MAX);
730
731        let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
732        rocksdb.put::<tables::StoragesHistory>(key_block_50, &block_list).unwrap();
733        rocksdb.put::<tables::StoragesHistory>(key_block_80, &block_list).unwrap();
734        rocksdb.put::<tables::StoragesHistory>(key_block_max, &block_list).unwrap();
735
736        // Create a test provider factory for MDBX
737        let factory = create_test_provider_factory();
738        factory.set_storage_settings_cache(
739            StorageSettings::legacy().with_storages_history_in_rocksdb(true),
740        );
741
742        // Set checkpoint to block 100
743        {
744            let provider = factory.database_provider_rw().unwrap();
745            provider
746                .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
747                .unwrap();
748            provider.commit().unwrap();
749        }
750
751        let provider = factory.database_provider_ro().unwrap();
752
753        // RocksDB max highest_block (80) is behind checkpoint (100)
754        let result = rocksdb.check_consistency(&provider).unwrap();
755        assert_eq!(result, Some(80), "Should unwind to the highest block present in RocksDB");
756    }
757
758    #[test]
759    fn test_check_consistency_mdbx_behind_checkpoint_needs_unwind() {
760        let temp_dir = TempDir::new().unwrap();
761        let rocksdb = RocksDBBuilder::new(temp_dir.path())
762            .with_table::<tables::TransactionHashNumbers>()
763            .build()
764            .unwrap();
765
766        let factory = create_test_provider_factory();
767        factory.set_storage_settings_cache(
768            StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
769        );
770
771        // Generate blocks with real transactions (blocks 0-2, 6 transactions total)
772        let mut rng = generators::rng();
773        let blocks = generators::random_block_range(
774            &mut rng,
775            0..=2,
776            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
777        );
778
779        {
780            let provider = factory.database_provider_rw().unwrap();
781            let mut tx_count = 0u64;
782            for block in &blocks {
783                provider
784                    .insert_block(&block.clone().try_recover().expect("recover block"))
785                    .unwrap();
786                for tx in &block.body().transactions {
787                    let hash = tx.trie_hash();
788                    rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
789                    tx_count += 1;
790                }
791            }
792            provider.commit().unwrap();
793        }
794
795        // Now simulate a scenario where checkpoint is ahead of MDBX.
796        // This happens when the checkpoint was saved but MDBX data was lost/corrupted.
797        // Set checkpoint to block 10 (beyond our actual data at block 2)
798        {
799            let provider = factory.database_provider_rw().unwrap();
800            provider
801                .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(10))
802                .unwrap();
803            provider.commit().unwrap();
804        }
805
806        let provider = factory.database_provider_ro().unwrap();
807
808        // MDBX has data up to block 2, but checkpoint says block 10 was processed.
809        // The static files highest tx matches MDBX last tx (both at block 2).
810        // Checkpoint > mdbx_block means we need to unwind to rebuild.
811        let result = rocksdb.check_consistency(&provider).unwrap();
812        assert_eq!(
813            result,
814            Some(2),
815            "Should require unwind to block 2 (MDBX's last block) to rebuild from checkpoint"
816        );
817    }
818
819    #[test]
820    fn test_check_consistency_rocksdb_ahead_of_checkpoint_prunes_excess() {
821        let temp_dir = TempDir::new().unwrap();
822        let rocksdb = RocksDBBuilder::new(temp_dir.path())
823            .with_table::<tables::TransactionHashNumbers>()
824            .build()
825            .unwrap();
826
827        // Create a test provider factory for MDBX
828        let factory = create_test_provider_factory();
829        factory.set_storage_settings_cache(
830            StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
831        );
832
833        // Generate blocks with real transactions:
834        // Blocks 0-5, each with 2 transactions = 12 total transactions (0-11)
835        let mut rng = generators::rng();
836        let blocks = generators::random_block_range(
837            &mut rng,
838            0..=5,
839            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
840        );
841
842        // Track which hashes belong to which blocks
843        let mut tx_hashes = Vec::new();
844        let mut tx_count = 0u64;
845        {
846            let provider = factory.database_provider_rw().unwrap();
847            // Insert ALL blocks (0-5) to write transactions to static files
848            for block in &blocks {
849                provider
850                    .insert_block(&block.clone().try_recover().expect("recover block"))
851                    .unwrap();
852                for tx in &block.body().transactions {
853                    let hash = tx.trie_hash();
854                    tx_hashes.push(hash);
855                    rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
856                    tx_count += 1;
857                }
858            }
859            provider.commit().unwrap();
860        }
861
862        // Simulate crash recovery scenario:
863        // MDBX was unwound to block 2, but RocksDB and static files still have more data.
864        // Remove TransactionBlocks entries for blocks 3-5 to simulate MDBX unwind.
865        {
866            let provider = factory.database_provider_rw().unwrap();
867            // Delete TransactionBlocks entries for tx > 5 (i.e., for blocks 3-5)
868            // TransactionBlocks maps last_tx_in_block -> block_number
869            // After unwind, only entries for blocks 0-2 should remain (tx 5 -> block 2)
870            let mut cursor = provider.tx_ref().cursor_write::<tables::TransactionBlocks>().unwrap();
871            // Walk and delete entries where block > 2
872            let mut to_delete = Vec::new();
873            let mut walker = cursor.walk(Some(0)).unwrap();
874            while let Some((tx_num, block_num)) = walker.next().transpose().unwrap() {
875                if block_num > 2 {
876                    to_delete.push(tx_num);
877                }
878            }
879            drop(walker);
880            for tx_num in to_delete {
881                cursor.seek_exact(tx_num).unwrap();
882                cursor.delete_current().unwrap();
883            }
884
885            // Set checkpoint to block 2
886            provider
887                .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(2))
888                .unwrap();
889            provider.commit().unwrap();
890        }
891
892        let provider = factory.database_provider_ro().unwrap();
893
894        // RocksDB has tx hashes for all blocks (0-5)
895        // MDBX TransactionBlocks only goes up to tx 5 (block 2)
896        // Static files have data for all txs (0-11)
897        // This means RocksDB is ahead and should prune entries for tx 6-11
898        let result = rocksdb.check_consistency(&provider).unwrap();
899        assert_eq!(result, None, "Should heal by pruning, no unwind needed");
900
901        // Verify: hashes for blocks 0-2 (tx 0-5) should remain, blocks 3-5 (tx 6-11) should be
902        // pruned First 6 hashes should remain
903        for (i, hash) in tx_hashes.iter().take(6).enumerate() {
904            assert!(
905                rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap().is_some(),
906                "tx {} should remain",
907                i
908            );
909        }
910        // Last 6 hashes should be pruned
911        for (i, hash) in tx_hashes.iter().skip(6).enumerate() {
912            assert!(
913                rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap().is_none(),
914                "tx {} should be pruned",
915                i + 6
916            );
917        }
918    }
919
920    #[test]
921    fn test_check_consistency_storages_history_ahead_of_checkpoint_prunes_excess() {
922        let temp_dir = TempDir::new().unwrap();
923        let rocksdb = RocksDBBuilder::new(temp_dir.path())
924            .with_table::<tables::StoragesHistory>()
925            .build()
926            .unwrap();
927
928        // Insert data into RocksDB with different highest_block_numbers
929        let key_block_50 = StorageShardedKey::new(Address::ZERO, B256::ZERO, 50);
930        let key_block_100 = StorageShardedKey::new(Address::ZERO, B256::from([1u8; 32]), 100);
931        let key_block_150 = StorageShardedKey::new(Address::ZERO, B256::from([2u8; 32]), 150);
932        let key_block_max = StorageShardedKey::new(Address::ZERO, B256::from([3u8; 32]), u64::MAX);
933
934        let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
935        rocksdb.put::<tables::StoragesHistory>(key_block_50.clone(), &block_list).unwrap();
936        rocksdb.put::<tables::StoragesHistory>(key_block_100.clone(), &block_list).unwrap();
937        rocksdb.put::<tables::StoragesHistory>(key_block_150.clone(), &block_list).unwrap();
938        rocksdb.put::<tables::StoragesHistory>(key_block_max.clone(), &block_list).unwrap();
939
940        // Create a test provider factory for MDBX
941        let factory = create_test_provider_factory();
942        factory.set_storage_settings_cache(
943            StorageSettings::legacy().with_storages_history_in_rocksdb(true),
944        );
945
946        // Set checkpoint to block 100
947        {
948            let provider = factory.database_provider_rw().unwrap();
949            provider
950                .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
951                .unwrap();
952            provider.commit().unwrap();
953        }
954
955        let provider = factory.database_provider_ro().unwrap();
956
957        // RocksDB has entries with highest_block = 150 which exceeds checkpoint (100)
958        // Should prune entries where highest_block > 100 (but not u64::MAX sentinel)
959        let result = rocksdb.check_consistency(&provider).unwrap();
960        assert_eq!(result, None, "Should heal by pruning, no unwind needed");
961
962        // Verify key_block_150 was pruned, but others remain
963        assert!(
964            rocksdb.get::<tables::StoragesHistory>(key_block_50).unwrap().is_some(),
965            "Entry with highest_block=50 should remain"
966        );
967        assert!(
968            rocksdb.get::<tables::StoragesHistory>(key_block_100).unwrap().is_some(),
969            "Entry with highest_block=100 should remain"
970        );
971        assert!(
972            rocksdb.get::<tables::StoragesHistory>(key_block_150).unwrap().is_none(),
973            "Entry with highest_block=150 should be pruned"
974        );
975        assert!(
976            rocksdb.get::<tables::StoragesHistory>(key_block_max).unwrap().is_some(),
977            "Entry with highest_block=u64::MAX (sentinel) should remain"
978        );
979    }
980
981    #[test]
982    fn test_check_consistency_storages_history_behind_checkpoint_single_entry() {
983        use reth_db_api::models::storage_sharded_key::StorageShardedKey;
984
985        let temp_dir = TempDir::new().unwrap();
986        let rocksdb = RocksDBBuilder::new(temp_dir.path())
987            .with_table::<tables::StoragesHistory>()
988            .build()
989            .unwrap();
990
991        // Insert data into RocksDB with highest_block_number below checkpoint
992        let key_block_50 = StorageShardedKey::new(Address::ZERO, B256::ZERO, 50);
993        let block_list = BlockNumberList::new_pre_sorted([10, 20, 30, 50]);
994        rocksdb.put::<tables::StoragesHistory>(key_block_50, &block_list).unwrap();
995
996        let factory = create_test_provider_factory();
997        factory.set_storage_settings_cache(
998            StorageSettings::legacy().with_storages_history_in_rocksdb(true),
999        );
1000
1001        // Set checkpoint to block 100
1002        {
1003            let provider = factory.database_provider_rw().unwrap();
1004            provider
1005                .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
1006                .unwrap();
1007            provider.commit().unwrap();
1008        }
1009
1010        let provider = factory.database_provider_ro().unwrap();
1011
1012        // RocksDB only has data up to block 50, but checkpoint says block 100 was processed
1013        let result = rocksdb.check_consistency(&provider).unwrap();
1014        assert_eq!(
1015            result,
1016            Some(50),
1017            "Should require unwind to block 50 to rebuild StoragesHistory"
1018        );
1019    }
1020
1021    /// Test that pruning works by fetching transactions and computing their hashes,
1022    /// rather than iterating all rows. This test uses random blocks with unique
1023    /// transactions so we can verify the correct entries are pruned.
1024    #[test]
1025    fn test_prune_transaction_hash_numbers_by_range() {
1026        let temp_dir = TempDir::new().unwrap();
1027        let rocksdb = RocksDBBuilder::new(temp_dir.path())
1028            .with_table::<tables::TransactionHashNumbers>()
1029            .build()
1030            .unwrap();
1031
1032        // Create a test provider factory for MDBX
1033        let factory = create_test_provider_factory();
1034        factory.set_storage_settings_cache(
1035            StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
1036        );
1037
1038        // Generate random blocks with unique transactions
1039        // Block 0 (genesis) has no transactions
1040        // Blocks 1-5 each have 2 transactions = 10 transactions total
1041        let mut rng = generators::rng();
1042        let blocks = generators::random_block_range(
1043            &mut rng,
1044            0..=5,
1045            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
1046        );
1047
1048        // Insert blocks into the database
1049        let mut tx_count = 0u64;
1050        let mut tx_hashes = Vec::new();
1051        {
1052            let provider = factory.database_provider_rw().unwrap();
1053
1054            for block in &blocks {
1055                provider
1056                    .insert_block(&block.clone().try_recover().expect("recover block"))
1057                    .unwrap();
1058
1059                // Store transaction hash -> tx_number mappings in RocksDB
1060                for tx in &block.body().transactions {
1061                    let hash = tx.trie_hash();
1062                    tx_hashes.push(hash);
1063                    rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
1064                    tx_count += 1;
1065                }
1066            }
1067
1068            // Set checkpoint to block 2 (meaning we should only have tx hashes for blocks 0-2)
1069            // Blocks 0, 1, 2 have 6 transactions (2 each), so tx 0-5 should remain
1070            provider
1071                .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(2))
1072                .unwrap();
1073            provider.commit().unwrap();
1074        }
1075
1076        // At this point:
1077        // - RocksDB has tx hashes for blocks 0-5 (10 total: 2 per block)
1078        // - Checkpoint says we only processed up to block 2
1079        // - We need to prune tx hashes for blocks 3, 4, 5 (tx 6-9)
1080
1081        // Verify RocksDB has the expected number of entries before pruning
1082        let rocksdb_count_before: usize =
1083            rocksdb.iter::<tables::TransactionHashNumbers>().unwrap().count();
1084        assert_eq!(
1085            rocksdb_count_before, tx_count as usize,
1086            "RocksDB should have all {} transaction hashes before pruning",
1087            tx_count
1088        );
1089
1090        let provider = factory.database_provider_ro().unwrap();
1091
1092        // Verify we can fetch transactions by tx range
1093        let all_txs = provider.transactions_by_tx_range(0..tx_count).unwrap();
1094        assert_eq!(all_txs.len(), tx_count as usize, "Should be able to fetch all transactions");
1095
1096        // Verify the hashes match between what we stored and what we compute from fetched txs
1097        for (i, tx) in all_txs.iter().enumerate() {
1098            let computed_hash = tx.trie_hash();
1099            assert_eq!(
1100                computed_hash, tx_hashes[i],
1101                "Hash mismatch for tx {}: stored {:?} vs computed {:?}",
1102                i, tx_hashes[i], computed_hash
1103            );
1104        }
1105
1106        // Blocks 0, 1, 2 have 2 tx each = 6 tx total (indices 0-5)
1107        // We want to keep tx 0-5, prune tx 6-9
1108        let max_tx_to_keep = 5u64;
1109        let tx_to_prune_start = max_tx_to_keep + 1;
1110
1111        // Prune transactions 6-9 (blocks 3-5)
1112        rocksdb
1113            .prune_transaction_hash_numbers_in_range(&provider, tx_to_prune_start..=(tx_count - 1))
1114            .expect("prune should succeed");
1115
1116        // Verify: transactions 0-5 should remain, 6-9 should be pruned
1117        let mut remaining_count = 0;
1118        for result in rocksdb.iter::<tables::TransactionHashNumbers>().unwrap() {
1119            let (_hash, tx_num) = result.unwrap();
1120            assert!(
1121                tx_num <= max_tx_to_keep,
1122                "Transaction {} should have been pruned (> {})",
1123                tx_num,
1124                max_tx_to_keep
1125            );
1126            remaining_count += 1;
1127        }
1128        assert_eq!(
1129            remaining_count,
1130            (max_tx_to_keep + 1) as usize,
1131            "Should have {} transactions (0-{})",
1132            max_tx_to_keep + 1,
1133            max_tx_to_keep
1134        );
1135    }
1136
1137    #[test]
1138    fn test_check_consistency_accounts_history_empty_with_checkpoint_needs_unwind() {
1139        let temp_dir = TempDir::new().unwrap();
1140        let rocksdb = RocksDBBuilder::new(temp_dir.path())
1141            .with_table::<tables::AccountsHistory>()
1142            .build()
1143            .unwrap();
1144
1145        // Create a test provider factory for MDBX
1146        let factory = create_test_provider_factory();
1147        factory.set_storage_settings_cache(
1148            StorageSettings::legacy().with_account_history_in_rocksdb(true),
1149        );
1150
1151        // Set a checkpoint indicating we should have processed up to block 100
1152        {
1153            let provider = factory.database_provider_rw().unwrap();
1154            provider
1155                .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
1156                .unwrap();
1157            provider.commit().unwrap();
1158        }
1159
1160        let provider = factory.database_provider_ro().unwrap();
1161
1162        // RocksDB is empty but checkpoint says block 100 was processed
1163        let result = rocksdb.check_consistency(&provider).unwrap();
1164        assert_eq!(result, Some(0), "Should require unwind to block 0 to rebuild AccountsHistory");
1165    }
1166
1167    #[test]
1168    fn test_check_consistency_accounts_history_has_data_no_checkpoint_prunes_data() {
1169        use reth_db_api::models::ShardedKey;
1170
1171        let temp_dir = TempDir::new().unwrap();
1172        let rocksdb = RocksDBBuilder::new(temp_dir.path())
1173            .with_table::<tables::AccountsHistory>()
1174            .build()
1175            .unwrap();
1176
1177        // Insert data into RocksDB
1178        let key = ShardedKey::new(Address::ZERO, 50);
1179        let block_list = BlockNumberList::new_pre_sorted([10, 20, 30, 50]);
1180        rocksdb.put::<tables::AccountsHistory>(key, &block_list).unwrap();
1181
1182        // Verify data exists
1183        assert!(rocksdb.last::<tables::AccountsHistory>().unwrap().is_some());
1184
1185        // Create a test provider factory for MDBX with NO checkpoint
1186        let factory = create_test_provider_factory();
1187        factory.set_storage_settings_cache(
1188            StorageSettings::legacy().with_account_history_in_rocksdb(true),
1189        );
1190
1191        let provider = factory.database_provider_ro().unwrap();
1192
1193        // RocksDB has data but checkpoint is 0
1194        // This means RocksDB has stale data that should be pruned (healed)
1195        let result = rocksdb.check_consistency(&provider).unwrap();
1196        assert_eq!(result, None, "Should heal by pruning, no unwind needed");
1197
1198        // Verify data was pruned
1199        assert!(
1200            rocksdb.last::<tables::AccountsHistory>().unwrap().is_none(),
1201            "RocksDB should be empty after pruning"
1202        );
1203    }
1204
1205    #[test]
1206    fn test_check_consistency_accounts_history_ahead_of_checkpoint_prunes_excess() {
1207        use reth_db_api::models::ShardedKey;
1208
1209        let temp_dir = TempDir::new().unwrap();
1210        let rocksdb = RocksDBBuilder::new(temp_dir.path())
1211            .with_table::<tables::AccountsHistory>()
1212            .build()
1213            .unwrap();
1214
1215        // Insert data into RocksDB with different highest_block_numbers
1216        let key_block_50 = ShardedKey::new(Address::ZERO, 50);
1217        let key_block_100 = ShardedKey::new(Address::random(), 100);
1218        let key_block_150 = ShardedKey::new(Address::random(), 150);
1219        let key_block_max = ShardedKey::new(Address::random(), u64::MAX);
1220
1221        let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
1222        rocksdb.put::<tables::AccountsHistory>(key_block_50.clone(), &block_list).unwrap();
1223        rocksdb.put::<tables::AccountsHistory>(key_block_100.clone(), &block_list).unwrap();
1224        rocksdb.put::<tables::AccountsHistory>(key_block_150.clone(), &block_list).unwrap();
1225        rocksdb.put::<tables::AccountsHistory>(key_block_max.clone(), &block_list).unwrap();
1226
1227        // Create a test provider factory for MDBX
1228        let factory = create_test_provider_factory();
1229        factory.set_storage_settings_cache(
1230            StorageSettings::legacy().with_account_history_in_rocksdb(true),
1231        );
1232
1233        // Set checkpoint to block 100
1234        {
1235            let provider = factory.database_provider_rw().unwrap();
1236            provider
1237                .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
1238                .unwrap();
1239            provider.commit().unwrap();
1240        }
1241
1242        let provider = factory.database_provider_ro().unwrap();
1243
1244        // RocksDB has entries with highest_block = 150 which exceeds checkpoint (100)
1245        // Should prune entries where highest_block > 100 (but not u64::MAX sentinel)
1246        let result = rocksdb.check_consistency(&provider).unwrap();
1247        assert_eq!(result, None, "Should heal by pruning, no unwind needed");
1248
1249        // Verify key_block_150 was pruned, but others remain
1250        assert!(
1251            rocksdb.get::<tables::AccountsHistory>(key_block_50).unwrap().is_some(),
1252            "Entry with highest_block=50 should remain"
1253        );
1254        assert!(
1255            rocksdb.get::<tables::AccountsHistory>(key_block_100).unwrap().is_some(),
1256            "Entry with highest_block=100 should remain"
1257        );
1258        assert!(
1259            rocksdb.get::<tables::AccountsHistory>(key_block_150).unwrap().is_none(),
1260            "Entry with highest_block=150 should be pruned"
1261        );
1262        assert!(
1263            rocksdb.get::<tables::AccountsHistory>(key_block_max).unwrap().is_some(),
1264            "Entry with highest_block=u64::MAX (sentinel) should remain"
1265        );
1266    }
1267
1268    #[test]
1269    fn test_check_consistency_accounts_history_behind_checkpoint_needs_unwind() {
1270        use reth_db_api::models::ShardedKey;
1271
1272        let temp_dir = TempDir::new().unwrap();
1273        let rocksdb = RocksDBBuilder::new(temp_dir.path())
1274            .with_table::<tables::AccountsHistory>()
1275            .build()
1276            .unwrap();
1277
1278        // Insert data into RocksDB with highest_block_number below checkpoint
1279        let key_block_50 = ShardedKey::new(Address::ZERO, 50);
1280        let block_list = BlockNumberList::new_pre_sorted([10, 20, 30, 50]);
1281        rocksdb.put::<tables::AccountsHistory>(key_block_50, &block_list).unwrap();
1282
1283        let factory = create_test_provider_factory();
1284        factory.set_storage_settings_cache(
1285            StorageSettings::legacy().with_account_history_in_rocksdb(true),
1286        );
1287
1288        // Set checkpoint to block 100
1289        {
1290            let provider = factory.database_provider_rw().unwrap();
1291            provider
1292                .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
1293                .unwrap();
1294            provider.commit().unwrap();
1295        }
1296
1297        let provider = factory.database_provider_ro().unwrap();
1298
1299        // RocksDB only has data up to block 50, but checkpoint says block 100 was processed
1300        let result = rocksdb.check_consistency(&provider).unwrap();
1301        assert_eq!(
1302            result,
1303            Some(50),
1304            "Should require unwind to block 50 to rebuild AccountsHistory"
1305        );
1306    }
1307}