Skip to main content

reth_stages/stages/
index_account_history.rs

1use super::collect_account_history_indices;
2use crate::stages::utils::{collect_history_indices, load_account_history};
3use reth_config::config::{EtlConfig, IndexHistoryConfig};
4use reth_db_api::{models::ShardedKey, tables, transaction::DbTxMut, Tables};
5use reth_provider::{
6    DBProvider, EitherWriter, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter,
7    RocksDBProviderFactory, StorageSettingsCache,
8};
9use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment};
10use reth_stages_api::{
11    ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId, UnwindInput, UnwindOutput,
12};
13use std::fmt::Debug;
14use tracing::info;
15
16/// Stage is indexing history the account changesets generated in
17/// [`ExecutionStage`][crate::stages::ExecutionStage]. For more information
18/// on index sharding take a look at [`tables::AccountsHistory`]
19#[derive(Debug)]
20pub struct IndexAccountHistoryStage {
21    /// Number of blocks after which the control
22    /// flow will be returned to the pipeline for commit.
23    pub commit_threshold: u64,
24    /// Pruning configuration.
25    pub prune_mode: Option<PruneMode>,
26    /// ETL configuration
27    pub etl_config: EtlConfig,
28}
29
30impl IndexAccountHistoryStage {
31    /// Create new instance of [`IndexAccountHistoryStage`].
32    pub const fn new(
33        config: IndexHistoryConfig,
34        etl_config: EtlConfig,
35        prune_mode: Option<PruneMode>,
36    ) -> Self {
37        Self { commit_threshold: config.commit_threshold, etl_config, prune_mode }
38    }
39}
40
41impl Default for IndexAccountHistoryStage {
42    fn default() -> Self {
43        Self { commit_threshold: 100_000, prune_mode: None, etl_config: EtlConfig::default() }
44    }
45}
46
47impl<Provider> Stage<Provider> for IndexAccountHistoryStage
48where
49    Provider: DBProvider<Tx: DbTxMut>
50        + HistoryWriter
51        + PruneCheckpointReader
52        + PruneCheckpointWriter
53        + reth_storage_api::ChangeSetReader
54        + reth_provider::StaticFileProviderFactory
55        + StorageSettingsCache
56        + RocksDBProviderFactory,
57{
58    /// Return the id of the stage
59    fn id(&self) -> StageId {
60        StageId::IndexAccountHistory
61    }
62
63    /// Execute the stage.
64    fn execute(
65        &mut self,
66        provider: &Provider,
67        mut input: ExecInput,
68    ) -> Result<ExecOutput, StageError> {
69        if let Some((target_prunable_block, prune_mode)) = self
70            .prune_mode
71            .map(|mode| {
72                mode.prune_target_block(
73                    input.target(),
74                    PruneSegment::AccountHistory,
75                    PrunePurpose::User,
76                )
77            })
78            .transpose()?
79            .flatten() &&
80            target_prunable_block > input.checkpoint().block_number
81        {
82            input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
83
84            // Save prune checkpoint only if we don't have one already.
85            // Otherwise, pruner may skip the unpruned range of blocks.
86            if provider.get_prune_checkpoint(PruneSegment::AccountHistory)?.is_none() {
87                provider.save_prune_checkpoint(
88                    PruneSegment::AccountHistory,
89                    PruneCheckpoint {
90                        block_number: Some(target_prunable_block),
91                        tx_number: None,
92                        prune_mode,
93                    },
94                )?;
95            }
96        }
97
98        if input.target_reached() {
99            return Ok(ExecOutput::done(input.checkpoint()))
100        }
101
102        let mut range = input.next_block_range();
103        let first_sync = input.checkpoint().block_number == 0;
104        let use_rocksdb = provider.cached_storage_settings().storage_v2;
105
106        // On first sync we might have history coming from genesis. We clear the table since it's
107        // faster to rebuild from scratch.
108        if first_sync {
109            if use_rocksdb {
110                // Note: RocksDB clear() executes immediately (not deferred to commit like MDBX),
111                // but this is safe for first_sync because if we crash before commit, the
112                // checkpoint stays at 0 and we'll just clear and rebuild again on restart. The
113                // source data (changesets) is intact.
114                provider.rocksdb_provider().clear::<tables::AccountsHistory>()?;
115            } else {
116                provider.tx_ref().clear::<tables::AccountsHistory>()?;
117            }
118            range = 0..=*input.next_block_range().end();
119        }
120
121        info!(target: "sync::stages::index_account_history::exec", ?first_sync, ?use_rocksdb, "Collecting indices");
122
123        let collector = if provider.cached_storage_settings().storage_v2 {
124            // Use the provider-based collection that can read from static files.
125            collect_account_history_indices(provider, range.clone(), &self.etl_config)?
126        } else {
127            collect_history_indices::<_, tables::AccountChangeSets, tables::AccountsHistory, _>(
128                provider,
129                range.clone(),
130                ShardedKey::new,
131                |(index, value)| (index, value.address),
132                &self.etl_config,
133            )?
134        };
135
136        info!(target: "sync::stages::index_account_history::exec", "Loading indices into database");
137
138        provider.with_rocksdb_batch_auto_commit(|rocksdb_batch| {
139            let mut writer = EitherWriter::new_accounts_history(provider, rocksdb_batch)?;
140            load_account_history(collector, first_sync, &mut writer)
141                .map_err(|e| reth_provider::ProviderError::other(Box::new(e)))?;
142            Ok(((), writer.into_raw_rocksdb_batch()))
143        })?;
144
145        if use_rocksdb {
146            provider.commit_pending_rocksdb_batches()?;
147            provider.rocksdb_provider().flush(&[Tables::AccountsHistory.name()])?;
148        }
149
150        Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: true })
151    }
152
153    /// Unwind the stage.
154    fn unwind(
155        &mut self,
156        provider: &Provider,
157        input: UnwindInput,
158    ) -> Result<UnwindOutput, StageError> {
159        let (range, unwind_progress, _) =
160            input.unwind_block_range_with_threshold(self.commit_threshold);
161
162        provider.unwind_account_history_indices_range(range)?;
163
164        // from HistoryIndex higher than that number.
165        Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_progress) })
166    }
167}
168
169#[cfg(test)]
170mod tests {
171    use super::*;
172    use crate::test_utils::{
173        stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
174        TestStageDB, UnwindStageTestRunner,
175    };
176    use alloy_primitives::{address, Address, BlockNumber, B256};
177    use itertools::Itertools;
178    use reth_db_api::{
179        cursor::DbCursorRO,
180        models::{
181            sharded_key, sharded_key::NUM_OF_INDICES_IN_SHARD, AccountBeforeTx,
182            StoredBlockBodyIndices,
183        },
184        transaction::DbTx,
185        BlockNumberList,
186    };
187    use reth_provider::{providers::StaticFileWriter, DatabaseProviderFactory};
188    use reth_testing_utils::generators::{
189        self, random_block_range, random_changeset_range, random_contract_account_range,
190        BlockRangeParams,
191    };
192    use std::collections::BTreeMap;
193
194    const ADDRESS: Address = address!("0x0000000000000000000000000000000000000001");
195
196    const LAST_BLOCK_IN_FULL_SHARD: BlockNumber = NUM_OF_INDICES_IN_SHARD as BlockNumber;
197    const MAX_BLOCK: BlockNumber = NUM_OF_INDICES_IN_SHARD as BlockNumber + 2;
198
199    const fn acc() -> AccountBeforeTx {
200        AccountBeforeTx { address: ADDRESS, info: None }
201    }
202
203    /// Shard for account
204    const fn shard(shard_index: u64) -> ShardedKey<Address> {
205        ShardedKey { key: ADDRESS, highest_block_number: shard_index }
206    }
207
208    fn list(list: &[u64]) -> BlockNumberList {
209        BlockNumberList::new(list.iter().copied()).unwrap()
210    }
211
212    fn cast(
213        table: Vec<(ShardedKey<Address>, BlockNumberList)>,
214    ) -> BTreeMap<ShardedKey<Address>, Vec<u64>> {
215        table
216            .into_iter()
217            .map(|(k, v)| {
218                let v = v.iter().collect();
219                (k, v)
220            })
221            .collect()
222    }
223
224    fn partial_setup(db: &TestStageDB) {
225        // setup
226        db.commit(|tx| {
227            for block in 0..=MAX_BLOCK {
228                tx.put::<tables::BlockBodyIndices>(
229                    block,
230                    StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
231                )?;
232                // setup changeset that is going to be applied to history index
233                tx.put::<tables::AccountChangeSets>(block, acc())?;
234            }
235            Ok(())
236        })
237        .unwrap()
238    }
239
240    fn run(db: &TestStageDB, run_to: u64, input_checkpoint: Option<BlockNumber>) {
241        let input = ExecInput {
242            target: Some(run_to),
243            checkpoint: input_checkpoint
244                .map(|block_number| StageCheckpoint { block_number, stage_checkpoint: None }),
245        };
246        let mut stage = IndexAccountHistoryStage::default();
247        let provider = db.factory.database_provider_rw().unwrap();
248        let out = stage.execute(&provider, input).unwrap();
249        assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(run_to), done: true });
250        provider.commit().unwrap();
251    }
252
253    fn unwind(db: &TestStageDB, unwind_from: u64, unwind_to: u64) {
254        let input = UnwindInput {
255            checkpoint: StageCheckpoint::new(unwind_from),
256            unwind_to,
257            ..Default::default()
258        };
259        let mut stage = IndexAccountHistoryStage::default();
260        let provider = db.factory.database_provider_rw().unwrap();
261        let out = stage.unwind(&provider, input).unwrap();
262        assert_eq!(out, UnwindOutput { checkpoint: StageCheckpoint::new(unwind_to) });
263        provider.commit().unwrap();
264    }
265
266    #[tokio::test]
267    async fn insert_index_to_genesis() {
268        // init
269        let db = TestStageDB::default();
270
271        // setup
272        partial_setup(&db);
273
274        // run
275        run(&db, 3, None);
276
277        // verify
278        let table = cast(db.table::<tables::AccountsHistory>().unwrap());
279        assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![0, 1, 2, 3])]));
280
281        // unwind
282        unwind(&db, 3, 0);
283
284        // verify initial state
285        let table = cast(db.table::<tables::AccountsHistory>().unwrap());
286        assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![0])]));
287    }
288
289    #[tokio::test]
290    async fn insert_index_to_not_empty_shard() {
291        // init
292        let db = TestStageDB::default();
293
294        // setup
295        partial_setup(&db);
296        db.commit(|tx| {
297            tx.put::<tables::AccountsHistory>(shard(u64::MAX), list(&[1, 2, 3])).unwrap();
298            Ok(())
299        })
300        .unwrap();
301
302        // run
303        run(&db, 5, Some(3));
304
305        // verify
306        let table = cast(db.table::<tables::AccountsHistory>().unwrap());
307        assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![1, 2, 3, 4, 5])]));
308
309        // unwind
310        unwind(&db, 5, 3);
311
312        // verify initial state
313        let table = cast(db.table::<tables::AccountsHistory>().unwrap());
314        assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![1, 2, 3])]));
315    }
316
317    #[tokio::test]
318    async fn insert_index_to_full_shard() {
319        // init
320        let db = TestStageDB::default();
321        let full_list = (1..=LAST_BLOCK_IN_FULL_SHARD).collect::<Vec<_>>();
322        assert_eq!(full_list.len(), NUM_OF_INDICES_IN_SHARD);
323
324        // setup
325        partial_setup(&db);
326        db.commit(|tx| {
327            tx.put::<tables::AccountsHistory>(shard(u64::MAX), list(&full_list)).unwrap();
328            Ok(())
329        })
330        .unwrap();
331
332        // run
333        run(&db, LAST_BLOCK_IN_FULL_SHARD + 2, Some(LAST_BLOCK_IN_FULL_SHARD));
334
335        // verify
336        let table = cast(db.table::<tables::AccountsHistory>().unwrap());
337        assert_eq!(
338            table,
339            BTreeMap::from([
340                (shard(LAST_BLOCK_IN_FULL_SHARD), full_list.clone()),
341                (shard(u64::MAX), vec![LAST_BLOCK_IN_FULL_SHARD + 1, LAST_BLOCK_IN_FULL_SHARD + 2])
342            ])
343        );
344
345        // unwind
346        unwind(&db, LAST_BLOCK_IN_FULL_SHARD + 2, LAST_BLOCK_IN_FULL_SHARD);
347
348        // verify initial state
349        let table = cast(db.table::<tables::AccountsHistory>().unwrap());
350        assert_eq!(table, BTreeMap::from([(shard(u64::MAX), full_list)]));
351    }
352
353    #[tokio::test]
354    async fn insert_index_to_fill_shard() {
355        // init
356        let db = TestStageDB::default();
357        let mut almost_full_list = (1..=LAST_BLOCK_IN_FULL_SHARD - 2).collect::<Vec<_>>();
358
359        // setup
360        partial_setup(&db);
361        db.commit(|tx| {
362            tx.put::<tables::AccountsHistory>(shard(u64::MAX), list(&almost_full_list)).unwrap();
363            Ok(())
364        })
365        .unwrap();
366
367        // run
368        run(&db, LAST_BLOCK_IN_FULL_SHARD, Some(LAST_BLOCK_IN_FULL_SHARD - 2));
369
370        // verify
371        almost_full_list.push(LAST_BLOCK_IN_FULL_SHARD - 1);
372        almost_full_list.push(LAST_BLOCK_IN_FULL_SHARD);
373        let table = cast(db.table::<tables::AccountsHistory>().unwrap());
374        assert_eq!(table, BTreeMap::from([(shard(u64::MAX), almost_full_list.clone())]));
375
376        // unwind
377        unwind(&db, LAST_BLOCK_IN_FULL_SHARD, LAST_BLOCK_IN_FULL_SHARD - 2);
378
379        // verify initial state
380        almost_full_list.pop();
381        almost_full_list.pop();
382        let table = cast(db.table::<tables::AccountsHistory>().unwrap());
383        assert_eq!(table, BTreeMap::from([(shard(u64::MAX), almost_full_list)]));
384
385        // verify initial state
386    }
387
388    #[tokio::test]
389    async fn insert_index_second_half_shard() {
390        // init
391        let db = TestStageDB::default();
392        let mut almost_full_list = (1..=LAST_BLOCK_IN_FULL_SHARD - 1).collect::<Vec<_>>();
393
394        // setup
395        partial_setup(&db);
396        db.commit(|tx| {
397            tx.put::<tables::AccountsHistory>(shard(u64::MAX), list(&almost_full_list)).unwrap();
398            Ok(())
399        })
400        .unwrap();
401
402        // run
403        run(&db, LAST_BLOCK_IN_FULL_SHARD + 1, Some(LAST_BLOCK_IN_FULL_SHARD - 1));
404
405        // verify
406        almost_full_list.push(LAST_BLOCK_IN_FULL_SHARD);
407        let table = cast(db.table::<tables::AccountsHistory>().unwrap());
408        assert_eq!(
409            table,
410            BTreeMap::from([
411                (shard(LAST_BLOCK_IN_FULL_SHARD), almost_full_list.clone()),
412                (shard(u64::MAX), vec![LAST_BLOCK_IN_FULL_SHARD + 1])
413            ])
414        );
415
416        // unwind
417        unwind(&db, LAST_BLOCK_IN_FULL_SHARD, LAST_BLOCK_IN_FULL_SHARD - 1);
418
419        // verify initial state
420        almost_full_list.pop();
421        let table = cast(db.table::<tables::AccountsHistory>().unwrap());
422        assert_eq!(table, BTreeMap::from([(shard(u64::MAX), almost_full_list)]));
423    }
424
425    #[tokio::test]
426    async fn insert_index_to_third_shard() {
427        // init
428        let db = TestStageDB::default();
429        let full_list = (1..=LAST_BLOCK_IN_FULL_SHARD).collect::<Vec<_>>();
430
431        // setup
432        partial_setup(&db);
433        db.commit(|tx| {
434            tx.put::<tables::AccountsHistory>(shard(1), list(&full_list)).unwrap();
435            tx.put::<tables::AccountsHistory>(shard(2), list(&full_list)).unwrap();
436            tx.put::<tables::AccountsHistory>(
437                shard(u64::MAX),
438                list(&[LAST_BLOCK_IN_FULL_SHARD + 1]),
439            )
440            .unwrap();
441            Ok(())
442        })
443        .unwrap();
444
445        run(&db, LAST_BLOCK_IN_FULL_SHARD + 2, Some(LAST_BLOCK_IN_FULL_SHARD + 1));
446
447        // verify
448        let table = cast(db.table::<tables::AccountsHistory>().unwrap());
449        assert_eq!(
450            table,
451            BTreeMap::from([
452                (shard(1), full_list.clone()),
453                (shard(2), full_list.clone()),
454                (shard(u64::MAX), vec![LAST_BLOCK_IN_FULL_SHARD + 1, LAST_BLOCK_IN_FULL_SHARD + 2])
455            ])
456        );
457
458        // unwind
459        unwind(&db, LAST_BLOCK_IN_FULL_SHARD + 2, LAST_BLOCK_IN_FULL_SHARD + 1);
460
461        // verify initial state
462        let table = cast(db.table::<tables::AccountsHistory>().unwrap());
463        assert_eq!(
464            table,
465            BTreeMap::from([
466                (shard(1), full_list.clone()),
467                (shard(2), full_list),
468                (shard(u64::MAX), vec![LAST_BLOCK_IN_FULL_SHARD + 1])
469            ])
470        );
471    }
472
473    #[tokio::test]
474    async fn insert_index_with_prune_mode() {
475        // init
476        let db = TestStageDB::default();
477
478        // setup
479        db.commit(|tx| {
480            // we just need first and last
481            tx.put::<tables::BlockBodyIndices>(
482                0,
483                StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
484            )
485            .unwrap();
486
487            tx.put::<tables::BlockBodyIndices>(
488                100,
489                StoredBlockBodyIndices { tx_count: 5, ..Default::default() },
490            )
491            .unwrap();
492
493            // setup changeset that are going to be applied to history index
494            tx.put::<tables::AccountChangeSets>(20, acc()).unwrap();
495            tx.put::<tables::AccountChangeSets>(36, acc()).unwrap();
496            tx.put::<tables::AccountChangeSets>(100, acc()).unwrap();
497            Ok(())
498        })
499        .unwrap();
500
501        // run
502        let input = ExecInput { target: Some(20000), ..Default::default() };
503        let mut stage = IndexAccountHistoryStage {
504            prune_mode: Some(PruneMode::Before(36)),
505            ..Default::default()
506        };
507        let provider = db.factory.database_provider_rw().unwrap();
508        let out = stage.execute(&provider, input).unwrap();
509        assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(20000), done: true });
510        provider.commit().unwrap();
511
512        // verify
513        let table = cast(db.table::<tables::AccountsHistory>().unwrap());
514        assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![36, 100])]));
515
516        // unwind
517        unwind(&db, 20000, 0);
518
519        // verify initial state
520        let table = db.table::<tables::AccountsHistory>().unwrap();
521        assert!(table.is_empty());
522    }
523
524    stage_test_suite_ext!(IndexAccountHistoryTestRunner, index_account_history);
525
526    struct IndexAccountHistoryTestRunner {
527        pub(crate) db: TestStageDB,
528        commit_threshold: u64,
529        prune_mode: Option<PruneMode>,
530    }
531
532    impl Default for IndexAccountHistoryTestRunner {
533        fn default() -> Self {
534            Self { db: TestStageDB::default(), commit_threshold: 1000, prune_mode: None }
535        }
536    }
537
538    impl StageTestRunner for IndexAccountHistoryTestRunner {
539        type S = IndexAccountHistoryStage;
540
541        fn db(&self) -> &TestStageDB {
542            &self.db
543        }
544
545        fn stage(&self) -> Self::S {
546            Self::S {
547                commit_threshold: self.commit_threshold,
548                prune_mode: self.prune_mode,
549                etl_config: EtlConfig::default(),
550            }
551        }
552    }
553
554    impl ExecuteStageTestRunner for IndexAccountHistoryTestRunner {
555        type Seed = ();
556
557        fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
558            let stage_process = input.checkpoint().block_number;
559            let start = stage_process + 1;
560            let end = input.target();
561            let mut rng = generators::rng();
562
563            let num_of_accounts = 31;
564            let accounts = random_contract_account_range(&mut rng, &mut (0..num_of_accounts))
565                .into_iter()
566                .collect::<BTreeMap<_, _>>();
567
568            let blocks = random_block_range(
569                &mut rng,
570                start..=end,
571                BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..3, ..Default::default() },
572            );
573
574            let (changesets, _) = random_changeset_range(
575                &mut rng,
576                blocks.iter(),
577                accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
578                0..3,
579                0..256,
580            );
581
582            // add block changeset from block 1.
583            self.db.insert_changesets(changesets, Some(start))?;
584
585            Ok(())
586        }
587
588        fn validate_execution(
589            &self,
590            input: ExecInput,
591            output: Option<ExecOutput>,
592        ) -> Result<(), TestRunnerError> {
593            if let Some(output) = output {
594                let start_block = input.next_block();
595                let end_block = output.checkpoint.block_number;
596                if start_block > end_block {
597                    return Ok(())
598                }
599
600                assert_eq!(
601                    output,
602                    ExecOutput { checkpoint: StageCheckpoint::new(input.target()), done: true }
603                );
604
605                let provider = self.db.factory.provider()?;
606                let mut changeset_cursor =
607                    provider.tx_ref().cursor_read::<tables::AccountChangeSets>()?;
608
609                let account_transitions =
610                    changeset_cursor.walk_range(start_block..=end_block)?.try_fold(
611                        BTreeMap::new(),
612                        |mut accounts: BTreeMap<Address, Vec<u64>>,
613                         entry|
614                         -> Result<_, TestRunnerError> {
615                            let (index, account) = entry?;
616                            accounts.entry(account.address).or_default().push(index);
617                            Ok(accounts)
618                        },
619                    )?;
620
621                let mut result = BTreeMap::new();
622                for (address, indices) in account_transitions {
623                    // chunk indices and insert them in shards of N size.
624                    let mut chunks = indices
625                        .iter()
626                        .chunks(sharded_key::NUM_OF_INDICES_IN_SHARD)
627                        .into_iter()
628                        .map(|chunks| chunks.copied().collect::<Vec<_>>())
629                        .collect::<Vec<Vec<_>>>();
630                    let last_chunk = chunks.pop();
631
632                    for list in chunks {
633                        result.insert(
634                            ShardedKey::new(
635                                address,
636                                *list.last().expect("Chuck does not return empty list")
637                                    as BlockNumber,
638                            ),
639                            list,
640                        );
641                    }
642
643                    if let Some(last_list) = last_chunk {
644                        result.insert(ShardedKey::new(address, u64::MAX), last_list);
645                    };
646                }
647
648                let table = cast(self.db.table::<tables::AccountsHistory>().unwrap());
649                assert_eq!(table, result);
650            }
651            Ok(())
652        }
653    }
654
655    impl UnwindStageTestRunner for IndexAccountHistoryTestRunner {
656        fn validate_unwind(&self, _input: UnwindInput) -> Result<(), TestRunnerError> {
657            let table = self.db.table::<tables::AccountsHistory>().unwrap();
658            assert!(table.is_empty());
659            Ok(())
660        }
661    }
662
663    mod rocksdb_tests {
664        use super::*;
665        use reth_provider::{
666            providers::StaticFileWriter, RocksDBProviderFactory, StaticFileProviderFactory,
667        };
668        use reth_static_file_types::StaticFileSegment;
669        use reth_storage_api::StorageSettings;
670
671        /// Sets up v2 account test data: writes block body indices to MDBX and
672        /// account changesets to static files (matching realistic v2 layout).
673        fn setup_v2_account_data(db: &TestStageDB, block_range: std::ops::RangeInclusive<u64>) {
674            db.factory.set_storage_settings_cache(StorageSettings::v2());
675
676            db.commit(|tx| {
677                for block in block_range.clone() {
678                    tx.put::<tables::BlockBodyIndices>(
679                        block,
680                        StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
681                    )?;
682                }
683                Ok(())
684            })
685            .unwrap();
686
687            let static_file_provider = db.factory.static_file_provider();
688            let mut writer =
689                static_file_provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
690            for block in block_range {
691                writer.append_account_changeset(vec![acc()], block).unwrap();
692            }
693            writer.commit().unwrap();
694        }
695
696        /// Test that when `account_history_in_rocksdb` is enabled, the stage
697        /// writes account history indices to `RocksDB` instead of MDBX.
698        #[tokio::test]
699        async fn execute_writes_to_rocksdb_when_enabled() {
700            let db = TestStageDB::default();
701            setup_v2_account_data(&db, 0..=10);
702
703            let input = ExecInput { target: Some(10), ..Default::default() };
704            let mut stage = IndexAccountHistoryStage::default();
705            let provider = db.factory.database_provider_rw().unwrap();
706            let out = stage.execute(&provider, input).unwrap();
707            assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(10), done: true });
708            provider.commit().unwrap();
709
710            // Verify MDBX table is empty (data should be in RocksDB)
711            let mdbx_table = db.table::<tables::AccountsHistory>().unwrap();
712            assert!(
713                mdbx_table.is_empty(),
714                "MDBX AccountsHistory should be empty when RocksDB is enabled"
715            );
716
717            // Verify RocksDB has the data
718            let rocksdb = db.factory.rocksdb_provider();
719            let result = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
720            assert!(result.is_some(), "RocksDB should contain account history");
721
722            let block_list = result.unwrap();
723            let blocks: Vec<u64> = block_list.iter().collect();
724            assert_eq!(blocks, (0..=10).collect::<Vec<_>>());
725        }
726
727        /// Test that unwind works correctly when `account_history_in_rocksdb` is enabled.
728        #[tokio::test]
729        async fn unwind_works_when_rocksdb_enabled() {
730            let db = TestStageDB::default();
731            setup_v2_account_data(&db, 0..=10);
732
733            let input = ExecInput { target: Some(10), ..Default::default() };
734            let mut stage = IndexAccountHistoryStage::default();
735            let provider = db.factory.database_provider_rw().unwrap();
736            let out = stage.execute(&provider, input).unwrap();
737            assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(10), done: true });
738            provider.commit().unwrap();
739
740            // Verify RocksDB has blocks 0-10 before unwind
741            let rocksdb = db.factory.rocksdb_provider();
742            let result = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
743            assert!(result.is_some(), "RocksDB should have data before unwind");
744            let blocks_before: Vec<u64> = result.unwrap().iter().collect();
745            assert_eq!(blocks_before, (0..=10).collect::<Vec<_>>());
746
747            // Unwind to block 5 (remove blocks 6-10)
748            let unwind_input =
749                UnwindInput { checkpoint: StageCheckpoint::new(10), unwind_to: 5, bad_block: None };
750            let provider = db.factory.database_provider_rw().unwrap();
751            let out = stage.unwind(&provider, unwind_input).unwrap();
752            assert_eq!(out, UnwindOutput { checkpoint: StageCheckpoint::new(5) });
753            provider.commit().unwrap();
754
755            // Verify RocksDB now only has blocks 0-5 (blocks 6-10 removed)
756            let rocksdb = db.factory.rocksdb_provider();
757            let result = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
758            assert!(result.is_some(), "RocksDB should still have data after unwind");
759            let blocks_after: Vec<u64> = result.unwrap().iter().collect();
760            assert_eq!(blocks_after, (0..=5).collect::<Vec<_>>(), "Should only have blocks 0-5");
761        }
762
763        /// Test incremental sync merges new data with existing shards.
764        #[tokio::test]
765        async fn execute_incremental_sync() {
766            let db = TestStageDB::default();
767            setup_v2_account_data(&db, 0..=10);
768
769            let input = ExecInput { target: Some(5), ..Default::default() };
770            let mut stage = IndexAccountHistoryStage::default();
771            let provider = db.factory.database_provider_rw().unwrap();
772            let out = stage.execute(&provider, input).unwrap();
773            assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(5), done: true });
774            provider.commit().unwrap();
775
776            let rocksdb = db.factory.rocksdb_provider();
777            let result = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
778            assert!(result.is_some());
779            let blocks: Vec<u64> = result.unwrap().iter().collect();
780            assert_eq!(blocks, (0..=5).collect::<Vec<_>>());
781
782            let input = ExecInput { target: Some(10), checkpoint: Some(StageCheckpoint::new(5)) };
783            let provider = db.factory.database_provider_rw().unwrap();
784            let out = stage.execute(&provider, input).unwrap();
785            assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(10), done: true });
786            provider.commit().unwrap();
787
788            let rocksdb = db.factory.rocksdb_provider();
789            let result = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
790            assert!(result.is_some(), "RocksDB should have merged data");
791            let blocks: Vec<u64> = result.unwrap().iter().collect();
792            assert_eq!(blocks, (0..=10).collect::<Vec<_>>());
793        }
794    }
795}