Skip to main content

reth_stages/stages/
index_storage_history.rs

1use super::{collect_history_indices, collect_storage_history_indices};
2use crate::{stages::utils::load_storage_history, StageCheckpoint, StageId};
3use reth_config::config::{EtlConfig, IndexHistoryConfig};
4use reth_db_api::{
5    models::{storage_sharded_key::StorageShardedKey, AddressStorageKey, BlockNumberAddress},
6    tables,
7    transaction::DbTxMut,
8    Tables,
9};
10use reth_provider::{
11    DBProvider, EitherWriter, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter,
12    RocksDBProviderFactory, StaticFileProviderFactory, StorageChangeSetReader,
13    StorageSettingsCache,
14};
15use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment};
16use reth_stages_api::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
17use std::fmt::Debug;
18use tracing::info;
19
20/// Stage is indexing history the storage changesets generated in
21/// [`ExecutionStage`][crate::stages::ExecutionStage]. For more information
22/// on index sharding take a look at [`tables::StoragesHistory`].
23#[derive(Debug)]
24pub struct IndexStorageHistoryStage {
25    /// Number of blocks after which the control
26    /// flow will be returned to the pipeline for commit.
27    pub commit_threshold: u64,
28    /// Pruning configuration.
29    pub prune_mode: Option<PruneMode>,
30    /// ETL configuration
31    pub etl_config: EtlConfig,
32}
33
34impl IndexStorageHistoryStage {
35    /// Create new instance of [`IndexStorageHistoryStage`].
36    pub const fn new(
37        config: IndexHistoryConfig,
38        etl_config: EtlConfig,
39        prune_mode: Option<PruneMode>,
40    ) -> Self {
41        Self { commit_threshold: config.commit_threshold, etl_config, prune_mode }
42    }
43}
44
45impl Default for IndexStorageHistoryStage {
46    fn default() -> Self {
47        Self { commit_threshold: 100_000, prune_mode: None, etl_config: EtlConfig::default() }
48    }
49}
50
51impl<Provider> Stage<Provider> for IndexStorageHistoryStage
52where
53    Provider: DBProvider<Tx: DbTxMut>
54        + HistoryWriter
55        + PruneCheckpointReader
56        + PruneCheckpointWriter
57        + StorageSettingsCache
58        + RocksDBProviderFactory
59        + StorageChangeSetReader
60        + StaticFileProviderFactory
61        + reth_provider::NodePrimitivesProvider,
62{
63    /// Return the id of the stage
64    fn id(&self) -> StageId {
65        StageId::IndexStorageHistory
66    }
67
68    /// Execute the stage.
69    fn execute(
70        &mut self,
71        provider: &Provider,
72        mut input: ExecInput,
73    ) -> Result<ExecOutput, StageError> {
74        if let Some((target_prunable_block, prune_mode)) = self
75            .prune_mode
76            .map(|mode| {
77                mode.prune_target_block(
78                    input.target(),
79                    PruneSegment::StorageHistory,
80                    PrunePurpose::User,
81                )
82            })
83            .transpose()?
84            .flatten() &&
85            target_prunable_block > input.checkpoint().block_number
86        {
87            input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
88
89            // Save prune checkpoint only if we don't have one already.
90            // Otherwise, pruner may skip the unpruned range of blocks.
91            if provider.get_prune_checkpoint(PruneSegment::StorageHistory)?.is_none() {
92                provider.save_prune_checkpoint(
93                    PruneSegment::StorageHistory,
94                    PruneCheckpoint {
95                        block_number: Some(target_prunable_block),
96                        tx_number: None,
97                        prune_mode,
98                    },
99                )?;
100            }
101        }
102
103        if input.target_reached() {
104            return Ok(ExecOutput::done(input.checkpoint()))
105        }
106
107        let mut range = input.next_block_range();
108        let first_sync = input.checkpoint().block_number == 0;
109        let use_rocksdb = provider.cached_storage_settings().storage_v2;
110
111        // On first sync we might have history coming from genesis. We clear the table since it's
112        // faster to rebuild from scratch.
113        if first_sync {
114            if use_rocksdb {
115                // Note: RocksDB clear() executes immediately (not deferred to commit like MDBX),
116                // but this is safe for first_sync because if we crash before commit, the
117                // checkpoint stays at 0 and we'll just clear and rebuild again on restart. The
118                // source data (changesets) is intact.
119                provider.rocksdb_provider().clear::<tables::StoragesHistory>()?;
120            } else {
121                provider.tx_ref().clear::<tables::StoragesHistory>()?;
122            }
123            range = 0..=*input.next_block_range().end();
124        }
125
126        info!(target: "sync::stages::index_storage_history::exec", ?first_sync, ?use_rocksdb, "Collecting indices");
127        let collector = if provider.cached_storage_settings().storage_v2 {
128            collect_storage_history_indices(provider, range.clone(), &self.etl_config)?
129        } else {
130            collect_history_indices::<_, tables::StorageChangeSets, tables::StoragesHistory, _>(
131                provider,
132                BlockNumberAddress::range(range.clone()),
133                |AddressStorageKey((address, storage_key)), highest_block_number| {
134                    StorageShardedKey::new(address, storage_key, highest_block_number)
135                },
136                |(key, value)| (key.block_number(), AddressStorageKey((key.address(), value.key))),
137                &self.etl_config,
138            )?
139        };
140
141        info!(target: "sync::stages::index_storage_history::exec", "Loading indices into database");
142
143        provider.with_rocksdb_batch_auto_commit(|rocksdb_batch| {
144            let mut writer = EitherWriter::new_storages_history(provider, rocksdb_batch)?;
145            load_storage_history(collector, first_sync, &mut writer)
146                .map_err(|e| reth_provider::ProviderError::other(Box::new(e)))?;
147            Ok(((), writer.into_raw_rocksdb_batch()))
148        })?;
149
150        if use_rocksdb {
151            provider.commit_pending_rocksdb_batches()?;
152            provider.rocksdb_provider().flush(&[Tables::StoragesHistory.name()])?;
153        }
154
155        Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: true })
156    }
157
158    /// Unwind the stage.
159    fn unwind(
160        &mut self,
161        provider: &Provider,
162        input: UnwindInput,
163    ) -> Result<UnwindOutput, StageError> {
164        let (range, unwind_progress, _) =
165            input.unwind_block_range_with_threshold(self.commit_threshold);
166
167        provider.unwind_storage_history_indices_range(range)?;
168
169        Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_progress) })
170    }
171}
172
173#[cfg(test)]
174mod tests {
175    use super::*;
176    use crate::test_utils::{
177        stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
178        TestStageDB, UnwindStageTestRunner,
179    };
180    use alloy_primitives::{address, b256, Address, BlockNumber, B256, U256};
181    use itertools::Itertools;
182    use reth_db_api::{
183        cursor::DbCursorRO,
184        models::{
185            sharded_key, storage_sharded_key::NUM_OF_INDICES_IN_SHARD, ShardedKey,
186            StoredBlockBodyIndices,
187        },
188        transaction::DbTx,
189        BlockNumberList,
190    };
191    use reth_primitives_traits::StorageEntry;
192    use reth_provider::{providers::StaticFileWriter, DatabaseProviderFactory};
193    use reth_testing_utils::generators::{
194        self, random_block_range, random_changeset_range, random_contract_account_range,
195        BlockRangeParams,
196    };
197    use std::collections::BTreeMap;
198
199    const ADDRESS: Address = address!("0x0000000000000000000000000000000000000001");
200    const STORAGE_KEY: B256 =
201        b256!("0x0000000000000000000000000000000000000000000000000000000000000001");
202
203    const LAST_BLOCK_IN_FULL_SHARD: BlockNumber = NUM_OF_INDICES_IN_SHARD as BlockNumber;
204    const MAX_BLOCK: BlockNumber = NUM_OF_INDICES_IN_SHARD as BlockNumber + 2;
205
206    const fn storage(key: B256) -> StorageEntry {
207        // Value is not used in indexing stage.
208        StorageEntry { key, value: U256::ZERO }
209    }
210
211    const fn block_number_address(block_number: u64) -> BlockNumberAddress {
212        BlockNumberAddress((block_number, ADDRESS))
213    }
214
215    /// Shard for account
216    const fn shard(shard_index: u64) -> StorageShardedKey {
217        StorageShardedKey {
218            address: ADDRESS,
219            sharded_key: ShardedKey { key: STORAGE_KEY, highest_block_number: shard_index },
220        }
221    }
222
223    fn list(list: &[u64]) -> BlockNumberList {
224        BlockNumberList::new(list.iter().copied()).unwrap()
225    }
226
227    fn cast(
228        table: Vec<(StorageShardedKey, BlockNumberList)>,
229    ) -> BTreeMap<StorageShardedKey, Vec<u64>> {
230        table
231            .into_iter()
232            .map(|(k, v)| {
233                let v = v.iter().collect();
234                (k, v)
235            })
236            .collect()
237    }
238
239    fn partial_setup(db: &TestStageDB) {
240        // setup
241        db.commit(|tx| {
242            for block in 0..=MAX_BLOCK {
243                tx.put::<tables::BlockBodyIndices>(
244                    block,
245                    StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
246                )?;
247                // setup changeset that is going to be applied to history index
248                tx.put::<tables::StorageChangeSets>(
249                    block_number_address(block),
250                    storage(STORAGE_KEY),
251                )?;
252            }
253            Ok(())
254        })
255        .unwrap()
256    }
257
258    fn run(db: &TestStageDB, run_to: u64, input_checkpoint: Option<BlockNumber>) {
259        let input = ExecInput {
260            target: Some(run_to),
261            checkpoint: input_checkpoint
262                .map(|block_number| StageCheckpoint { block_number, stage_checkpoint: None }),
263        };
264        let mut stage = IndexStorageHistoryStage::default();
265        let provider = db.factory.database_provider_rw().unwrap();
266        let out = stage.execute(&provider, input).unwrap();
267        assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(run_to), done: true });
268        provider.commit().unwrap();
269    }
270
271    fn unwind(db: &TestStageDB, unwind_from: u64, unwind_to: u64) {
272        let input = UnwindInput {
273            checkpoint: StageCheckpoint::new(unwind_from),
274            unwind_to,
275            ..Default::default()
276        };
277        let mut stage = IndexStorageHistoryStage::default();
278        let provider = db.factory.database_provider_rw().unwrap();
279        let out = stage.unwind(&provider, input).unwrap();
280        assert_eq!(out, UnwindOutput { checkpoint: StageCheckpoint::new(unwind_to) });
281        provider.commit().unwrap();
282    }
283
284    #[tokio::test]
285    async fn insert_index_to_genesis() {
286        // init
287        let db = TestStageDB::default();
288
289        // setup
290        partial_setup(&db);
291
292        // run
293        run(&db, 3, None);
294
295        // verify
296        let table = cast(db.table::<tables::StoragesHistory>().unwrap());
297        assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![0, 1, 2, 3])]));
298
299        // unwind
300        unwind(&db, 5, 0);
301
302        // verify initial state
303        let table = cast(db.table::<tables::StoragesHistory>().unwrap());
304        assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![0])]));
305    }
306
307    #[tokio::test]
308    async fn insert_index_to_not_empty_shard() {
309        // init
310        let db = TestStageDB::default();
311
312        // setup
313        partial_setup(&db);
314        db.commit(|tx| {
315            tx.put::<tables::StoragesHistory>(shard(u64::MAX), list(&[1, 2, 3])).unwrap();
316            Ok(())
317        })
318        .unwrap();
319
320        // run
321        run(&db, 5, Some(3));
322
323        // verify
324        let table = cast(db.table::<tables::StoragesHistory>().unwrap());
325        assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![1, 2, 3, 4, 5])]));
326
327        // unwind
328        unwind(&db, 5, 3);
329
330        // verify initial state
331        let table = cast(db.table::<tables::StoragesHistory>().unwrap());
332        assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![1, 2, 3])]));
333    }
334
335    #[tokio::test]
336    async fn insert_index_to_full_shard() {
337        // init
338        let db = TestStageDB::default();
339        // change does not matter only that account is present in changeset.
340        let full_list = (1..=LAST_BLOCK_IN_FULL_SHARD).collect::<Vec<_>>();
341
342        // setup
343        partial_setup(&db);
344        db.commit(|tx| {
345            tx.put::<tables::StoragesHistory>(shard(u64::MAX), list(&full_list)).unwrap();
346            Ok(())
347        })
348        .unwrap();
349
350        // run
351        run(&db, LAST_BLOCK_IN_FULL_SHARD + 2, Some(LAST_BLOCK_IN_FULL_SHARD));
352
353        // verify
354        let table = cast(db.table::<tables::StoragesHistory>().unwrap());
355        assert_eq!(
356            table,
357            BTreeMap::from([
358                (shard(LAST_BLOCK_IN_FULL_SHARD), full_list.clone()),
359                (shard(u64::MAX), vec![LAST_BLOCK_IN_FULL_SHARD + 1, LAST_BLOCK_IN_FULL_SHARD + 2])
360            ])
361        );
362
363        // unwind
364        unwind(&db, LAST_BLOCK_IN_FULL_SHARD + 2, LAST_BLOCK_IN_FULL_SHARD);
365
366        // verify initial state
367        let table = cast(db.table::<tables::StoragesHistory>().unwrap());
368        assert_eq!(table, BTreeMap::from([(shard(u64::MAX), full_list)]));
369    }
370
371    #[tokio::test]
372    async fn insert_index_to_fill_shard() {
373        // init
374        let db = TestStageDB::default();
375        let mut almost_full_list = (1..=LAST_BLOCK_IN_FULL_SHARD - 2).collect::<Vec<_>>();
376
377        // setup
378        partial_setup(&db);
379        db.commit(|tx| {
380            tx.put::<tables::StoragesHistory>(shard(u64::MAX), list(&almost_full_list)).unwrap();
381            Ok(())
382        })
383        .unwrap();
384
385        // run
386        run(&db, LAST_BLOCK_IN_FULL_SHARD, Some(LAST_BLOCK_IN_FULL_SHARD - 2));
387
388        // verify
389        almost_full_list.push(LAST_BLOCK_IN_FULL_SHARD - 1);
390        almost_full_list.push(LAST_BLOCK_IN_FULL_SHARD);
391        let table = cast(db.table::<tables::StoragesHistory>().unwrap());
392        assert_eq!(table, BTreeMap::from([(shard(u64::MAX), almost_full_list.clone())]));
393
394        // unwind
395        unwind(&db, LAST_BLOCK_IN_FULL_SHARD, LAST_BLOCK_IN_FULL_SHARD - 2);
396
397        // verify initial state
398        almost_full_list.pop();
399        almost_full_list.pop();
400        let table = cast(db.table::<tables::StoragesHistory>().unwrap());
401        assert_eq!(table, BTreeMap::from([(shard(u64::MAX), almost_full_list)]));
402
403        // verify initial state
404    }
405
406    #[tokio::test]
407    async fn insert_index_second_half_shard() {
408        // init
409        let db = TestStageDB::default();
410        let mut almost_full_list = (1..=LAST_BLOCK_IN_FULL_SHARD - 1).collect::<Vec<_>>();
411
412        // setup
413        partial_setup(&db);
414        db.commit(|tx| {
415            tx.put::<tables::StoragesHistory>(shard(u64::MAX), list(&almost_full_list)).unwrap();
416            Ok(())
417        })
418        .unwrap();
419
420        // run
421        run(&db, LAST_BLOCK_IN_FULL_SHARD + 1, Some(LAST_BLOCK_IN_FULL_SHARD - 1));
422
423        // verify
424        almost_full_list.push(LAST_BLOCK_IN_FULL_SHARD);
425        let table = cast(db.table::<tables::StoragesHistory>().unwrap());
426        assert_eq!(
427            table,
428            BTreeMap::from([
429                (shard(LAST_BLOCK_IN_FULL_SHARD), almost_full_list.clone()),
430                (shard(u64::MAX), vec![LAST_BLOCK_IN_FULL_SHARD + 1])
431            ])
432        );
433
434        // unwind
435        unwind(&db, LAST_BLOCK_IN_FULL_SHARD, LAST_BLOCK_IN_FULL_SHARD - 1);
436
437        // verify initial state
438        almost_full_list.pop();
439        let table = cast(db.table::<tables::StoragesHistory>().unwrap());
440        assert_eq!(table, BTreeMap::from([(shard(u64::MAX), almost_full_list)]));
441    }
442
443    #[tokio::test]
444    async fn insert_index_to_third_shard() {
445        // init
446        let db = TestStageDB::default();
447        let full_list = (1..=LAST_BLOCK_IN_FULL_SHARD).collect::<Vec<_>>();
448
449        // setup
450        partial_setup(&db);
451        db.commit(|tx| {
452            tx.put::<tables::StoragesHistory>(shard(1), list(&full_list)).unwrap();
453            tx.put::<tables::StoragesHistory>(shard(2), list(&full_list)).unwrap();
454            tx.put::<tables::StoragesHistory>(
455                shard(u64::MAX),
456                list(&[LAST_BLOCK_IN_FULL_SHARD + 1]),
457            )
458            .unwrap();
459            Ok(())
460        })
461        .unwrap();
462
463        run(&db, LAST_BLOCK_IN_FULL_SHARD + 2, Some(LAST_BLOCK_IN_FULL_SHARD + 1));
464
465        // verify
466        let table = cast(db.table::<tables::StoragesHistory>().unwrap());
467        assert_eq!(
468            table,
469            BTreeMap::from([
470                (shard(1), full_list.clone()),
471                (shard(2), full_list.clone()),
472                (shard(u64::MAX), vec![LAST_BLOCK_IN_FULL_SHARD + 1, LAST_BLOCK_IN_FULL_SHARD + 2])
473            ])
474        );
475
476        // unwind
477        unwind(&db, LAST_BLOCK_IN_FULL_SHARD + 2, LAST_BLOCK_IN_FULL_SHARD + 1);
478
479        // verify initial state
480        let table = cast(db.table::<tables::StoragesHistory>().unwrap());
481        assert_eq!(
482            table,
483            BTreeMap::from([
484                (shard(1), full_list.clone()),
485                (shard(2), full_list),
486                (shard(u64::MAX), vec![LAST_BLOCK_IN_FULL_SHARD + 1])
487            ])
488        );
489    }
490
491    #[tokio::test]
492    async fn insert_index_with_prune_mode() {
493        // init
494        let db = TestStageDB::default();
495
496        // setup
497        db.commit(|tx| {
498            // we just need first and last
499            tx.put::<tables::BlockBodyIndices>(
500                0,
501                StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
502            )
503            .unwrap();
504
505            tx.put::<tables::BlockBodyIndices>(
506                100,
507                StoredBlockBodyIndices { tx_count: 5, ..Default::default() },
508            )
509            .unwrap();
510
511            // setup changeset that are going to be applied to history index
512            tx.put::<tables::StorageChangeSets>(block_number_address(20), storage(STORAGE_KEY))
513                .unwrap();
514            tx.put::<tables::StorageChangeSets>(block_number_address(36), storage(STORAGE_KEY))
515                .unwrap();
516            tx.put::<tables::StorageChangeSets>(block_number_address(100), storage(STORAGE_KEY))
517                .unwrap();
518            Ok(())
519        })
520        .unwrap();
521
522        // run
523        let input = ExecInput { target: Some(20000), ..Default::default() };
524        let mut stage = IndexStorageHistoryStage {
525            prune_mode: Some(PruneMode::Before(36)),
526            ..Default::default()
527        };
528        let provider = db.factory.database_provider_rw().unwrap();
529        let out = stage.execute(&provider, input).unwrap();
530        assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(20000), done: true });
531        provider.commit().unwrap();
532
533        // verify
534        let table = cast(db.table::<tables::StoragesHistory>().unwrap());
535        assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![36, 100])]));
536
537        // unwind
538        unwind(&db, 20000, 0);
539
540        // verify initial state
541        let table = db.table::<tables::StoragesHistory>().unwrap();
542        assert!(table.is_empty());
543    }
544
545    stage_test_suite_ext!(IndexStorageHistoryTestRunner, index_storage_history);
546
547    struct IndexStorageHistoryTestRunner {
548        pub(crate) db: TestStageDB,
549        commit_threshold: u64,
550        prune_mode: Option<PruneMode>,
551    }
552
553    impl Default for IndexStorageHistoryTestRunner {
554        fn default() -> Self {
555            Self { db: TestStageDB::default(), commit_threshold: 1000, prune_mode: None }
556        }
557    }
558
559    impl StageTestRunner for IndexStorageHistoryTestRunner {
560        type S = IndexStorageHistoryStage;
561
562        fn db(&self) -> &TestStageDB {
563            &self.db
564        }
565
566        fn stage(&self) -> Self::S {
567            Self::S {
568                commit_threshold: self.commit_threshold,
569                prune_mode: self.prune_mode,
570                etl_config: EtlConfig::default(),
571            }
572        }
573    }
574
575    impl ExecuteStageTestRunner for IndexStorageHistoryTestRunner {
576        type Seed = ();
577
578        fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
579            let stage_process = input.checkpoint().block_number;
580            let start = stage_process + 1;
581            let end = input.target();
582            let mut rng = generators::rng();
583
584            let num_of_accounts = 31;
585            let accounts = random_contract_account_range(&mut rng, &mut (0..num_of_accounts))
586                .into_iter()
587                .collect::<BTreeMap<_, _>>();
588
589            let blocks = random_block_range(
590                &mut rng,
591                start..=end,
592                BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..3, ..Default::default() },
593            );
594
595            let (changesets, _) = random_changeset_range(
596                &mut rng,
597                blocks.iter(),
598                accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
599                0..3,
600                0..u64::MAX,
601            );
602
603            // add block changeset from block 1.
604            self.db.insert_changesets(changesets, Some(start))?;
605
606            Ok(())
607        }
608
609        fn validate_execution(
610            &self,
611            input: ExecInput,
612            output: Option<ExecOutput>,
613        ) -> Result<(), TestRunnerError> {
614            if let Some(output) = output {
615                let start_block = input.next_block();
616                let end_block = output.checkpoint.block_number;
617                if start_block > end_block {
618                    return Ok(())
619                }
620
621                assert_eq!(
622                    output,
623                    ExecOutput { checkpoint: StageCheckpoint::new(input.target()), done: true }
624                );
625
626                let provider = self.db.factory.provider()?;
627                let mut changeset_cursor =
628                    provider.tx_ref().cursor_read::<tables::StorageChangeSets>()?;
629
630                let storage_transitions = changeset_cursor
631                    .walk_range(BlockNumberAddress::range(start_block..=end_block))?
632                    .try_fold(
633                        BTreeMap::new(),
634                        |mut storages: BTreeMap<(Address, B256), Vec<u64>>,
635                         entry|
636                         -> Result<_, TestRunnerError> {
637                            let (index, storage) = entry?;
638                            storages
639                                .entry((index.address(), storage.key))
640                                .or_default()
641                                .push(index.block_number());
642                            Ok(storages)
643                        },
644                    )?;
645
646                let mut result = BTreeMap::new();
647                for (partial_key, indices) in storage_transitions {
648                    // chunk indices and insert them in shards of N size.
649                    let mut chunks = indices
650                        .iter()
651                        .chunks(sharded_key::NUM_OF_INDICES_IN_SHARD)
652                        .into_iter()
653                        .map(|chunks| chunks.copied().collect::<Vec<u64>>())
654                        .collect::<Vec<Vec<_>>>();
655                    let last_chunk = chunks.pop();
656
657                    for list in chunks {
658                        result.insert(
659                            StorageShardedKey::new(
660                                partial_key.0,
661                                partial_key.1,
662                                *list.last().expect("Chuck does not return empty list")
663                                    as BlockNumber,
664                            ),
665                            list,
666                        );
667                    }
668
669                    if let Some(last_list) = last_chunk {
670                        result.insert(
671                            StorageShardedKey::new(partial_key.0, partial_key.1, u64::MAX),
672                            last_list,
673                        );
674                    };
675                }
676
677                let table = cast(self.db.table::<tables::StoragesHistory>().unwrap());
678                assert_eq!(table, result);
679            }
680            Ok(())
681        }
682    }
683
684    impl UnwindStageTestRunner for IndexStorageHistoryTestRunner {
685        fn validate_unwind(&self, _input: UnwindInput) -> Result<(), TestRunnerError> {
686            let table = self.db.table::<tables::StoragesHistory>().unwrap();
687            assert!(table.is_empty());
688            Ok(())
689        }
690    }
691
692    mod rocksdb_tests {
693        use super::*;
694        use reth_db_api::models::StorageBeforeTx;
695        use reth_provider::{providers::StaticFileWriter, RocksDBProviderFactory};
696        use reth_static_file_types::StaticFileSegment;
697        use reth_storage_api::StorageSettings;
698
699        /// Sets up v2 storage test data: writes block body indices to MDBX and
700        /// storage changesets to static files (matching realistic v2 layout).
701        fn setup_v2_storage_data(db: &TestStageDB, block_range: std::ops::RangeInclusive<u64>) {
702            db.factory.set_storage_settings_cache(StorageSettings::v2());
703
704            db.commit(|tx| {
705                for block in block_range.clone() {
706                    tx.put::<tables::BlockBodyIndices>(
707                        block,
708                        StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
709                    )?;
710                }
711                Ok(())
712            })
713            .unwrap();
714
715            let static_file_provider = db.factory.static_file_provider();
716            let mut writer =
717                static_file_provider.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
718            for block in block_range {
719                writer
720                    .append_storage_changeset(
721                        vec![StorageBeforeTx {
722                            address: ADDRESS,
723                            key: STORAGE_KEY,
724                            value: U256::ZERO,
725                        }],
726                        block,
727                    )
728                    .unwrap();
729            }
730            writer.commit().unwrap();
731        }
732
733        /// Test that when `storages_history_in_rocksdb` is enabled, the stage
734        /// writes storage history indices to `RocksDB` instead of MDBX.
735        #[tokio::test]
736        async fn execute_writes_to_rocksdb_when_enabled() {
737            let db = TestStageDB::default();
738            setup_v2_storage_data(&db, 0..=10);
739
740            let input = ExecInput { target: Some(10), ..Default::default() };
741            let mut stage = IndexStorageHistoryStage::default();
742            let provider = db.factory.database_provider_rw().unwrap();
743            let out = stage.execute(&provider, input).unwrap();
744            assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(10), done: true });
745            provider.commit().unwrap();
746
747            let mdbx_table = db.table::<tables::StoragesHistory>().unwrap();
748            assert!(
749                mdbx_table.is_empty(),
750                "MDBX StoragesHistory should be empty when RocksDB is enabled"
751            );
752
753            let rocksdb = db.factory.rocksdb_provider();
754            let result = rocksdb.get::<tables::StoragesHistory>(shard(u64::MAX)).unwrap();
755            assert!(result.is_some(), "RocksDB should contain storage history");
756
757            let block_list = result.unwrap();
758            let blocks: Vec<u64> = block_list.iter().collect();
759            assert_eq!(blocks, (0..=10).collect::<Vec<_>>());
760        }
761
762        /// Test that unwind works correctly when `storages_history_in_rocksdb` is enabled.
763        #[tokio::test]
764        async fn unwind_works_when_rocksdb_enabled() {
765            let db = TestStageDB::default();
766            setup_v2_storage_data(&db, 0..=10);
767
768            let input = ExecInput { target: Some(10), ..Default::default() };
769            let mut stage = IndexStorageHistoryStage::default();
770            let provider = db.factory.database_provider_rw().unwrap();
771            let out = stage.execute(&provider, input).unwrap();
772            assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(10), done: true });
773            provider.commit().unwrap();
774
775            let rocksdb = db.factory.rocksdb_provider();
776            let result = rocksdb.get::<tables::StoragesHistory>(shard(u64::MAX)).unwrap();
777            assert!(result.is_some(), "RocksDB should have data before unwind");
778            let blocks_before: Vec<u64> = result.unwrap().iter().collect();
779            assert_eq!(blocks_before, (0..=10).collect::<Vec<_>>());
780
781            let unwind_input =
782                UnwindInput { checkpoint: StageCheckpoint::new(10), unwind_to: 5, bad_block: None };
783            let provider = db.factory.database_provider_rw().unwrap();
784            let out = stage.unwind(&provider, unwind_input).unwrap();
785            assert_eq!(out, UnwindOutput { checkpoint: StageCheckpoint::new(5) });
786            provider.commit().unwrap();
787
788            let rocksdb = db.factory.rocksdb_provider();
789            let result = rocksdb.get::<tables::StoragesHistory>(shard(u64::MAX)).unwrap();
790            assert!(result.is_some(), "RocksDB should still have data after partial unwind");
791            let blocks_after: Vec<u64> = result.unwrap().iter().collect();
792            assert_eq!(
793                blocks_after,
794                (0..=5).collect::<Vec<_>>(),
795                "Should only have blocks 0-5 after unwind to block 5"
796            );
797        }
798
799        /// Test that unwind to block 0 keeps only block 0's history.
800        #[tokio::test]
801        async fn unwind_to_zero_keeps_block_zero() {
802            let db = TestStageDB::default();
803            setup_v2_storage_data(&db, 0..=5);
804
805            let input = ExecInput { target: Some(5), ..Default::default() };
806            let mut stage = IndexStorageHistoryStage::default();
807            let provider = db.factory.database_provider_rw().unwrap();
808            let out = stage.execute(&provider, input).unwrap();
809            assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(5), done: true });
810            provider.commit().unwrap();
811
812            let rocksdb = db.factory.rocksdb_provider();
813            let result = rocksdb.get::<tables::StoragesHistory>(shard(u64::MAX)).unwrap();
814            assert!(result.is_some(), "RocksDB should have data before unwind");
815
816            let unwind_input =
817                UnwindInput { checkpoint: StageCheckpoint::new(5), unwind_to: 0, bad_block: None };
818            let provider = db.factory.database_provider_rw().unwrap();
819            let out = stage.unwind(&provider, unwind_input).unwrap();
820            assert_eq!(out, UnwindOutput { checkpoint: StageCheckpoint::new(0) });
821            provider.commit().unwrap();
822
823            let rocksdb = db.factory.rocksdb_provider();
824            let result = rocksdb.get::<tables::StoragesHistory>(shard(u64::MAX)).unwrap();
825            assert!(result.is_some(), "RocksDB should still have block 0 history");
826            let blocks_after: Vec<u64> = result.unwrap().iter().collect();
827            assert_eq!(blocks_after, vec![0], "Should only have block 0 after unwinding to 0");
828        }
829
830        /// Test incremental sync merges new data with existing shards.
831        #[tokio::test]
832        async fn execute_incremental_sync() {
833            let db = TestStageDB::default();
834            setup_v2_storage_data(&db, 0..=10);
835
836            let input = ExecInput { target: Some(5), ..Default::default() };
837            let mut stage = IndexStorageHistoryStage::default();
838            let provider = db.factory.database_provider_rw().unwrap();
839            let out = stage.execute(&provider, input).unwrap();
840            assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(5), done: true });
841            provider.commit().unwrap();
842
843            let rocksdb = db.factory.rocksdb_provider();
844            let result = rocksdb.get::<tables::StoragesHistory>(shard(u64::MAX)).unwrap();
845            assert!(result.is_some());
846            let blocks: Vec<u64> = result.unwrap().iter().collect();
847            assert_eq!(blocks, (0..=5).collect::<Vec<_>>());
848
849            let input = ExecInput { target: Some(10), checkpoint: Some(StageCheckpoint::new(5)) };
850            let provider = db.factory.database_provider_rw().unwrap();
851            let out = stage.execute(&provider, input).unwrap();
852            assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(10), done: true });
853            provider.commit().unwrap();
854
855            let rocksdb = db.factory.rocksdb_provider();
856            let result = rocksdb.get::<tables::StoragesHistory>(shard(u64::MAX)).unwrap();
857            assert!(result.is_some(), "RocksDB should have merged data");
858            let blocks: Vec<u64> = result.unwrap().iter().collect();
859            assert_eq!(blocks, (0..=10).collect::<Vec<_>>());
860        }
861
862        /// Test multi-shard unwind correctly handles shards that span across unwind boundary.
863        #[tokio::test]
864        async fn unwind_multi_shard() {
865            use reth_db_api::models::sharded_key::NUM_OF_INDICES_IN_SHARD;
866
867            let db = TestStageDB::default();
868            let num_blocks = (NUM_OF_INDICES_IN_SHARD * 2 + 100) as u64;
869            setup_v2_storage_data(&db, 0..=num_blocks - 1);
870
871            let input = ExecInput { target: Some(num_blocks - 1), ..Default::default() };
872            let mut stage = IndexStorageHistoryStage::default();
873            let provider = db.factory.database_provider_rw().unwrap();
874            let out = stage.execute(&provider, input).unwrap();
875            assert_eq!(
876                out,
877                ExecOutput { checkpoint: StageCheckpoint::new(num_blocks - 1), done: true }
878            );
879            provider.commit().unwrap();
880
881            let rocksdb = db.factory.rocksdb_provider();
882            let shards = rocksdb.storage_history_shards(ADDRESS, STORAGE_KEY).unwrap();
883            assert!(shards.len() >= 2, "Should have at least 2 shards for {} blocks", num_blocks);
884
885            let unwind_to = NUM_OF_INDICES_IN_SHARD as u64 + 50;
886            let unwind_input = UnwindInput {
887                checkpoint: StageCheckpoint::new(num_blocks - 1),
888                unwind_to,
889                bad_block: None,
890            };
891            let provider = db.factory.database_provider_rw().unwrap();
892            let out = stage.unwind(&provider, unwind_input).unwrap();
893            assert_eq!(out, UnwindOutput { checkpoint: StageCheckpoint::new(unwind_to) });
894            provider.commit().unwrap();
895
896            let rocksdb = db.factory.rocksdb_provider();
897            let shards_after = rocksdb.storage_history_shards(ADDRESS, STORAGE_KEY).unwrap();
898            assert!(!shards_after.is_empty(), "Should still have shards after unwind");
899
900            let all_blocks: Vec<u64> =
901                shards_after.iter().flat_map(|(_, list)| list.iter()).collect();
902            assert_eq!(
903                all_blocks,
904                (0..=unwind_to).collect::<Vec<_>>(),
905                "Should only have blocks 0 to {} after unwind",
906                unwind_to
907            );
908        }
909    }
910}