1use super::collect_account_history_indices;
2use crate::stages::utils::{collect_history_indices, load_account_history};
3use reth_config::config::{EtlConfig, IndexHistoryConfig};
4use reth_db_api::{models::ShardedKey, tables, transaction::DbTxMut, Tables};
5use reth_provider::{
6 DBProvider, EitherWriter, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter,
7 RocksDBProviderFactory, StorageSettingsCache,
8};
9use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment};
10use reth_stages_api::{
11 ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId, UnwindInput, UnwindOutput,
12};
13use std::fmt::Debug;
14use tracing::info;
15
16#[derive(Debug)]
20pub struct IndexAccountHistoryStage {
21 pub commit_threshold: u64,
24 pub prune_mode: Option<PruneMode>,
26 pub etl_config: EtlConfig,
28}
29
30impl IndexAccountHistoryStage {
31 pub const fn new(
33 config: IndexHistoryConfig,
34 etl_config: EtlConfig,
35 prune_mode: Option<PruneMode>,
36 ) -> Self {
37 Self { commit_threshold: config.commit_threshold, etl_config, prune_mode }
38 }
39}
40
41impl Default for IndexAccountHistoryStage {
42 fn default() -> Self {
43 Self { commit_threshold: 100_000, prune_mode: None, etl_config: EtlConfig::default() }
44 }
45}
46
47impl<Provider> Stage<Provider> for IndexAccountHistoryStage
48where
49 Provider: DBProvider<Tx: DbTxMut>
50 + HistoryWriter
51 + PruneCheckpointReader
52 + PruneCheckpointWriter
53 + reth_storage_api::ChangeSetReader
54 + reth_provider::StaticFileProviderFactory
55 + StorageSettingsCache
56 + RocksDBProviderFactory,
57{
58 fn id(&self) -> StageId {
60 StageId::IndexAccountHistory
61 }
62
63 fn execute(
65 &mut self,
66 provider: &Provider,
67 mut input: ExecInput,
68 ) -> Result<ExecOutput, StageError> {
69 if let Some((target_prunable_block, prune_mode)) = self
70 .prune_mode
71 .map(|mode| {
72 mode.prune_target_block(
73 input.target(),
74 PruneSegment::AccountHistory,
75 PrunePurpose::User,
76 )
77 })
78 .transpose()?
79 .flatten() &&
80 target_prunable_block > input.checkpoint().block_number
81 {
82 input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
83
84 if provider.get_prune_checkpoint(PruneSegment::AccountHistory)?.is_none() {
87 provider.save_prune_checkpoint(
88 PruneSegment::AccountHistory,
89 PruneCheckpoint {
90 block_number: Some(target_prunable_block),
91 tx_number: None,
92 prune_mode,
93 },
94 )?;
95 }
96 }
97
98 if input.target_reached() {
99 return Ok(ExecOutput::done(input.checkpoint()))
100 }
101
102 let mut range = input.next_block_range();
103 let first_sync = input.checkpoint().block_number == 0;
104 let use_rocksdb = provider.cached_storage_settings().storage_v2;
105
106 if first_sync {
109 if use_rocksdb {
110 provider.rocksdb_provider().clear::<tables::AccountsHistory>()?;
115 } else {
116 provider.tx_ref().clear::<tables::AccountsHistory>()?;
117 }
118 range = 0..=*input.next_block_range().end();
119 }
120
121 info!(target: "sync::stages::index_account_history::exec", ?first_sync, ?use_rocksdb, "Collecting indices");
122
123 let collector = if provider.cached_storage_settings().storage_v2 {
124 collect_account_history_indices(provider, range.clone(), &self.etl_config)?
126 } else {
127 collect_history_indices::<_, tables::AccountChangeSets, tables::AccountsHistory, _>(
128 provider,
129 range.clone(),
130 ShardedKey::new,
131 |(index, value)| (index, value.address),
132 &self.etl_config,
133 )?
134 };
135
136 info!(target: "sync::stages::index_account_history::exec", "Loading indices into database");
137
138 provider.with_rocksdb_batch_auto_commit(|rocksdb_batch| {
139 let mut writer = EitherWriter::new_accounts_history(provider, rocksdb_batch)?;
140 load_account_history(collector, first_sync, &mut writer)
141 .map_err(|e| reth_provider::ProviderError::other(Box::new(e)))?;
142 Ok(((), writer.into_raw_rocksdb_batch()))
143 })?;
144
145 if use_rocksdb {
146 provider.commit_pending_rocksdb_batches()?;
147 provider.rocksdb_provider().flush(&[Tables::AccountsHistory.name()])?;
148 }
149
150 Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: true })
151 }
152
153 fn unwind(
155 &mut self,
156 provider: &Provider,
157 input: UnwindInput,
158 ) -> Result<UnwindOutput, StageError> {
159 let (range, unwind_progress, _) =
160 input.unwind_block_range_with_threshold(self.commit_threshold);
161
162 provider.unwind_account_history_indices_range(range)?;
163
164 Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_progress) })
166 }
167}
168
169#[cfg(test)]
170mod tests {
171 use super::*;
172 use crate::test_utils::{
173 stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
174 TestStageDB, UnwindStageTestRunner,
175 };
176 use alloy_primitives::{address, Address, BlockNumber, B256};
177 use itertools::Itertools;
178 use reth_db_api::{
179 cursor::DbCursorRO,
180 models::{
181 sharded_key, sharded_key::NUM_OF_INDICES_IN_SHARD, AccountBeforeTx,
182 StoredBlockBodyIndices,
183 },
184 transaction::DbTx,
185 BlockNumberList,
186 };
187 use reth_provider::{providers::StaticFileWriter, DatabaseProviderFactory};
188 use reth_testing_utils::generators::{
189 self, random_block_range, random_changeset_range, random_contract_account_range,
190 BlockRangeParams,
191 };
192 use std::collections::BTreeMap;
193
194 const ADDRESS: Address = address!("0x0000000000000000000000000000000000000001");
195
196 const LAST_BLOCK_IN_FULL_SHARD: BlockNumber = NUM_OF_INDICES_IN_SHARD as BlockNumber;
197 const MAX_BLOCK: BlockNumber = NUM_OF_INDICES_IN_SHARD as BlockNumber + 2;
198
199 const fn acc() -> AccountBeforeTx {
200 AccountBeforeTx { address: ADDRESS, info: None }
201 }
202
203 const fn shard(shard_index: u64) -> ShardedKey<Address> {
205 ShardedKey { key: ADDRESS, highest_block_number: shard_index }
206 }
207
208 fn list(list: &[u64]) -> BlockNumberList {
209 BlockNumberList::new(list.iter().copied()).unwrap()
210 }
211
212 fn cast(
213 table: Vec<(ShardedKey<Address>, BlockNumberList)>,
214 ) -> BTreeMap<ShardedKey<Address>, Vec<u64>> {
215 table
216 .into_iter()
217 .map(|(k, v)| {
218 let v = v.iter().collect();
219 (k, v)
220 })
221 .collect()
222 }
223
224 fn partial_setup(db: &TestStageDB) {
225 db.commit(|tx| {
227 for block in 0..=MAX_BLOCK {
228 tx.put::<tables::BlockBodyIndices>(
229 block,
230 StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
231 )?;
232 tx.put::<tables::AccountChangeSets>(block, acc())?;
234 }
235 Ok(())
236 })
237 .unwrap()
238 }
239
240 fn run(db: &TestStageDB, run_to: u64, input_checkpoint: Option<BlockNumber>) {
241 let input = ExecInput {
242 target: Some(run_to),
243 checkpoint: input_checkpoint
244 .map(|block_number| StageCheckpoint { block_number, stage_checkpoint: None }),
245 };
246 let mut stage = IndexAccountHistoryStage::default();
247 let provider = db.factory.database_provider_rw().unwrap();
248 let out = stage.execute(&provider, input).unwrap();
249 assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(run_to), done: true });
250 provider.commit().unwrap();
251 }
252
253 fn unwind(db: &TestStageDB, unwind_from: u64, unwind_to: u64) {
254 let input = UnwindInput {
255 checkpoint: StageCheckpoint::new(unwind_from),
256 unwind_to,
257 ..Default::default()
258 };
259 let mut stage = IndexAccountHistoryStage::default();
260 let provider = db.factory.database_provider_rw().unwrap();
261 let out = stage.unwind(&provider, input).unwrap();
262 assert_eq!(out, UnwindOutput { checkpoint: StageCheckpoint::new(unwind_to) });
263 provider.commit().unwrap();
264 }
265
266 #[tokio::test]
267 async fn insert_index_to_genesis() {
268 let db = TestStageDB::default();
270
271 partial_setup(&db);
273
274 run(&db, 3, None);
276
277 let table = cast(db.table::<tables::AccountsHistory>().unwrap());
279 assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![0, 1, 2, 3])]));
280
281 unwind(&db, 3, 0);
283
284 let table = cast(db.table::<tables::AccountsHistory>().unwrap());
286 assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![0])]));
287 }
288
289 #[tokio::test]
290 async fn insert_index_to_not_empty_shard() {
291 let db = TestStageDB::default();
293
294 partial_setup(&db);
296 db.commit(|tx| {
297 tx.put::<tables::AccountsHistory>(shard(u64::MAX), list(&[1, 2, 3])).unwrap();
298 Ok(())
299 })
300 .unwrap();
301
302 run(&db, 5, Some(3));
304
305 let table = cast(db.table::<tables::AccountsHistory>().unwrap());
307 assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![1, 2, 3, 4, 5])]));
308
309 unwind(&db, 5, 3);
311
312 let table = cast(db.table::<tables::AccountsHistory>().unwrap());
314 assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![1, 2, 3])]));
315 }
316
317 #[tokio::test]
318 async fn insert_index_to_full_shard() {
319 let db = TestStageDB::default();
321 let full_list = (1..=LAST_BLOCK_IN_FULL_SHARD).collect::<Vec<_>>();
322 assert_eq!(full_list.len(), NUM_OF_INDICES_IN_SHARD);
323
324 partial_setup(&db);
326 db.commit(|tx| {
327 tx.put::<tables::AccountsHistory>(shard(u64::MAX), list(&full_list)).unwrap();
328 Ok(())
329 })
330 .unwrap();
331
332 run(&db, LAST_BLOCK_IN_FULL_SHARD + 2, Some(LAST_BLOCK_IN_FULL_SHARD));
334
335 let table = cast(db.table::<tables::AccountsHistory>().unwrap());
337 assert_eq!(
338 table,
339 BTreeMap::from([
340 (shard(LAST_BLOCK_IN_FULL_SHARD), full_list.clone()),
341 (shard(u64::MAX), vec![LAST_BLOCK_IN_FULL_SHARD + 1, LAST_BLOCK_IN_FULL_SHARD + 2])
342 ])
343 );
344
345 unwind(&db, LAST_BLOCK_IN_FULL_SHARD + 2, LAST_BLOCK_IN_FULL_SHARD);
347
348 let table = cast(db.table::<tables::AccountsHistory>().unwrap());
350 assert_eq!(table, BTreeMap::from([(shard(u64::MAX), full_list)]));
351 }
352
353 #[tokio::test]
354 async fn insert_index_to_fill_shard() {
355 let db = TestStageDB::default();
357 let mut almost_full_list = (1..=LAST_BLOCK_IN_FULL_SHARD - 2).collect::<Vec<_>>();
358
359 partial_setup(&db);
361 db.commit(|tx| {
362 tx.put::<tables::AccountsHistory>(shard(u64::MAX), list(&almost_full_list)).unwrap();
363 Ok(())
364 })
365 .unwrap();
366
367 run(&db, LAST_BLOCK_IN_FULL_SHARD, Some(LAST_BLOCK_IN_FULL_SHARD - 2));
369
370 almost_full_list.push(LAST_BLOCK_IN_FULL_SHARD - 1);
372 almost_full_list.push(LAST_BLOCK_IN_FULL_SHARD);
373 let table = cast(db.table::<tables::AccountsHistory>().unwrap());
374 assert_eq!(table, BTreeMap::from([(shard(u64::MAX), almost_full_list.clone())]));
375
376 unwind(&db, LAST_BLOCK_IN_FULL_SHARD, LAST_BLOCK_IN_FULL_SHARD - 2);
378
379 almost_full_list.pop();
381 almost_full_list.pop();
382 let table = cast(db.table::<tables::AccountsHistory>().unwrap());
383 assert_eq!(table, BTreeMap::from([(shard(u64::MAX), almost_full_list)]));
384
385 }
387
388 #[tokio::test]
389 async fn insert_index_second_half_shard() {
390 let db = TestStageDB::default();
392 let mut almost_full_list = (1..=LAST_BLOCK_IN_FULL_SHARD - 1).collect::<Vec<_>>();
393
394 partial_setup(&db);
396 db.commit(|tx| {
397 tx.put::<tables::AccountsHistory>(shard(u64::MAX), list(&almost_full_list)).unwrap();
398 Ok(())
399 })
400 .unwrap();
401
402 run(&db, LAST_BLOCK_IN_FULL_SHARD + 1, Some(LAST_BLOCK_IN_FULL_SHARD - 1));
404
405 almost_full_list.push(LAST_BLOCK_IN_FULL_SHARD);
407 let table = cast(db.table::<tables::AccountsHistory>().unwrap());
408 assert_eq!(
409 table,
410 BTreeMap::from([
411 (shard(LAST_BLOCK_IN_FULL_SHARD), almost_full_list.clone()),
412 (shard(u64::MAX), vec![LAST_BLOCK_IN_FULL_SHARD + 1])
413 ])
414 );
415
416 unwind(&db, LAST_BLOCK_IN_FULL_SHARD, LAST_BLOCK_IN_FULL_SHARD - 1);
418
419 almost_full_list.pop();
421 let table = cast(db.table::<tables::AccountsHistory>().unwrap());
422 assert_eq!(table, BTreeMap::from([(shard(u64::MAX), almost_full_list)]));
423 }
424
425 #[tokio::test]
426 async fn insert_index_to_third_shard() {
427 let db = TestStageDB::default();
429 let full_list = (1..=LAST_BLOCK_IN_FULL_SHARD).collect::<Vec<_>>();
430
431 partial_setup(&db);
433 db.commit(|tx| {
434 tx.put::<tables::AccountsHistory>(shard(1), list(&full_list)).unwrap();
435 tx.put::<tables::AccountsHistory>(shard(2), list(&full_list)).unwrap();
436 tx.put::<tables::AccountsHistory>(
437 shard(u64::MAX),
438 list(&[LAST_BLOCK_IN_FULL_SHARD + 1]),
439 )
440 .unwrap();
441 Ok(())
442 })
443 .unwrap();
444
445 run(&db, LAST_BLOCK_IN_FULL_SHARD + 2, Some(LAST_BLOCK_IN_FULL_SHARD + 1));
446
447 let table = cast(db.table::<tables::AccountsHistory>().unwrap());
449 assert_eq!(
450 table,
451 BTreeMap::from([
452 (shard(1), full_list.clone()),
453 (shard(2), full_list.clone()),
454 (shard(u64::MAX), vec![LAST_BLOCK_IN_FULL_SHARD + 1, LAST_BLOCK_IN_FULL_SHARD + 2])
455 ])
456 );
457
458 unwind(&db, LAST_BLOCK_IN_FULL_SHARD + 2, LAST_BLOCK_IN_FULL_SHARD + 1);
460
461 let table = cast(db.table::<tables::AccountsHistory>().unwrap());
463 assert_eq!(
464 table,
465 BTreeMap::from([
466 (shard(1), full_list.clone()),
467 (shard(2), full_list),
468 (shard(u64::MAX), vec![LAST_BLOCK_IN_FULL_SHARD + 1])
469 ])
470 );
471 }
472
473 #[tokio::test]
474 async fn insert_index_with_prune_mode() {
475 let db = TestStageDB::default();
477
478 db.commit(|tx| {
480 tx.put::<tables::BlockBodyIndices>(
482 0,
483 StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
484 )
485 .unwrap();
486
487 tx.put::<tables::BlockBodyIndices>(
488 100,
489 StoredBlockBodyIndices { tx_count: 5, ..Default::default() },
490 )
491 .unwrap();
492
493 tx.put::<tables::AccountChangeSets>(20, acc()).unwrap();
495 tx.put::<tables::AccountChangeSets>(36, acc()).unwrap();
496 tx.put::<tables::AccountChangeSets>(100, acc()).unwrap();
497 Ok(())
498 })
499 .unwrap();
500
501 let input = ExecInput { target: Some(20000), ..Default::default() };
503 let mut stage = IndexAccountHistoryStage {
504 prune_mode: Some(PruneMode::Before(36)),
505 ..Default::default()
506 };
507 let provider = db.factory.database_provider_rw().unwrap();
508 let out = stage.execute(&provider, input).unwrap();
509 assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(20000), done: true });
510 provider.commit().unwrap();
511
512 let table = cast(db.table::<tables::AccountsHistory>().unwrap());
514 assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![36, 100])]));
515
516 unwind(&db, 20000, 0);
518
519 let table = db.table::<tables::AccountsHistory>().unwrap();
521 assert!(table.is_empty());
522 }
523
524 stage_test_suite_ext!(IndexAccountHistoryTestRunner, index_account_history);
525
526 struct IndexAccountHistoryTestRunner {
527 pub(crate) db: TestStageDB,
528 commit_threshold: u64,
529 prune_mode: Option<PruneMode>,
530 }
531
532 impl Default for IndexAccountHistoryTestRunner {
533 fn default() -> Self {
534 Self { db: TestStageDB::default(), commit_threshold: 1000, prune_mode: None }
535 }
536 }
537
538 impl StageTestRunner for IndexAccountHistoryTestRunner {
539 type S = IndexAccountHistoryStage;
540
541 fn db(&self) -> &TestStageDB {
542 &self.db
543 }
544
545 fn stage(&self) -> Self::S {
546 Self::S {
547 commit_threshold: self.commit_threshold,
548 prune_mode: self.prune_mode,
549 etl_config: EtlConfig::default(),
550 }
551 }
552 }
553
554 impl ExecuteStageTestRunner for IndexAccountHistoryTestRunner {
555 type Seed = ();
556
557 fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
558 let stage_process = input.checkpoint().block_number;
559 let start = stage_process + 1;
560 let end = input.target();
561 let mut rng = generators::rng();
562
563 let num_of_accounts = 31;
564 let accounts = random_contract_account_range(&mut rng, &mut (0..num_of_accounts))
565 .into_iter()
566 .collect::<BTreeMap<_, _>>();
567
568 let blocks = random_block_range(
569 &mut rng,
570 start..=end,
571 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..3, ..Default::default() },
572 );
573
574 let (changesets, _) = random_changeset_range(
575 &mut rng,
576 blocks.iter(),
577 accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
578 0..3,
579 0..256,
580 );
581
582 self.db.insert_changesets(changesets, Some(start))?;
584
585 Ok(())
586 }
587
588 fn validate_execution(
589 &self,
590 input: ExecInput,
591 output: Option<ExecOutput>,
592 ) -> Result<(), TestRunnerError> {
593 if let Some(output) = output {
594 let start_block = input.next_block();
595 let end_block = output.checkpoint.block_number;
596 if start_block > end_block {
597 return Ok(())
598 }
599
600 assert_eq!(
601 output,
602 ExecOutput { checkpoint: StageCheckpoint::new(input.target()), done: true }
603 );
604
605 let provider = self.db.factory.provider()?;
606 let mut changeset_cursor =
607 provider.tx_ref().cursor_read::<tables::AccountChangeSets>()?;
608
609 let account_transitions =
610 changeset_cursor.walk_range(start_block..=end_block)?.try_fold(
611 BTreeMap::new(),
612 |mut accounts: BTreeMap<Address, Vec<u64>>,
613 entry|
614 -> Result<_, TestRunnerError> {
615 let (index, account) = entry?;
616 accounts.entry(account.address).or_default().push(index);
617 Ok(accounts)
618 },
619 )?;
620
621 let mut result = BTreeMap::new();
622 for (address, indices) in account_transitions {
623 let mut chunks = indices
625 .iter()
626 .chunks(sharded_key::NUM_OF_INDICES_IN_SHARD)
627 .into_iter()
628 .map(|chunks| chunks.copied().collect::<Vec<_>>())
629 .collect::<Vec<Vec<_>>>();
630 let last_chunk = chunks.pop();
631
632 for list in chunks {
633 result.insert(
634 ShardedKey::new(
635 address,
636 *list.last().expect("Chuck does not return empty list")
637 as BlockNumber,
638 ),
639 list,
640 );
641 }
642
643 if let Some(last_list) = last_chunk {
644 result.insert(ShardedKey::new(address, u64::MAX), last_list);
645 };
646 }
647
648 let table = cast(self.db.table::<tables::AccountsHistory>().unwrap());
649 assert_eq!(table, result);
650 }
651 Ok(())
652 }
653 }
654
655 impl UnwindStageTestRunner for IndexAccountHistoryTestRunner {
656 fn validate_unwind(&self, _input: UnwindInput) -> Result<(), TestRunnerError> {
657 let table = self.db.table::<tables::AccountsHistory>().unwrap();
658 assert!(table.is_empty());
659 Ok(())
660 }
661 }
662
663 mod rocksdb_tests {
664 use super::*;
665 use reth_provider::{
666 providers::StaticFileWriter, RocksDBProviderFactory, StaticFileProviderFactory,
667 };
668 use reth_static_file_types::StaticFileSegment;
669 use reth_storage_api::StorageSettings;
670
671 fn setup_v2_account_data(db: &TestStageDB, block_range: std::ops::RangeInclusive<u64>) {
674 db.factory.set_storage_settings_cache(StorageSettings::v2());
675
676 db.commit(|tx| {
677 for block in block_range.clone() {
678 tx.put::<tables::BlockBodyIndices>(
679 block,
680 StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
681 )?;
682 }
683 Ok(())
684 })
685 .unwrap();
686
687 let static_file_provider = db.factory.static_file_provider();
688 let mut writer =
689 static_file_provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
690 for block in block_range {
691 writer.append_account_changeset(vec![acc()], block).unwrap();
692 }
693 writer.commit().unwrap();
694 }
695
696 #[tokio::test]
699 async fn execute_writes_to_rocksdb_when_enabled() {
700 let db = TestStageDB::default();
701 setup_v2_account_data(&db, 0..=10);
702
703 let input = ExecInput { target: Some(10), ..Default::default() };
704 let mut stage = IndexAccountHistoryStage::default();
705 let provider = db.factory.database_provider_rw().unwrap();
706 let out = stage.execute(&provider, input).unwrap();
707 assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(10), done: true });
708 provider.commit().unwrap();
709
710 let mdbx_table = db.table::<tables::AccountsHistory>().unwrap();
712 assert!(
713 mdbx_table.is_empty(),
714 "MDBX AccountsHistory should be empty when RocksDB is enabled"
715 );
716
717 let rocksdb = db.factory.rocksdb_provider();
719 let result = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
720 assert!(result.is_some(), "RocksDB should contain account history");
721
722 let block_list = result.unwrap();
723 let blocks: Vec<u64> = block_list.iter().collect();
724 assert_eq!(blocks, (0..=10).collect::<Vec<_>>());
725 }
726
727 #[tokio::test]
729 async fn unwind_works_when_rocksdb_enabled() {
730 let db = TestStageDB::default();
731 setup_v2_account_data(&db, 0..=10);
732
733 let input = ExecInput { target: Some(10), ..Default::default() };
734 let mut stage = IndexAccountHistoryStage::default();
735 let provider = db.factory.database_provider_rw().unwrap();
736 let out = stage.execute(&provider, input).unwrap();
737 assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(10), done: true });
738 provider.commit().unwrap();
739
740 let rocksdb = db.factory.rocksdb_provider();
742 let result = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
743 assert!(result.is_some(), "RocksDB should have data before unwind");
744 let blocks_before: Vec<u64> = result.unwrap().iter().collect();
745 assert_eq!(blocks_before, (0..=10).collect::<Vec<_>>());
746
747 let unwind_input =
749 UnwindInput { checkpoint: StageCheckpoint::new(10), unwind_to: 5, bad_block: None };
750 let provider = db.factory.database_provider_rw().unwrap();
751 let out = stage.unwind(&provider, unwind_input).unwrap();
752 assert_eq!(out, UnwindOutput { checkpoint: StageCheckpoint::new(5) });
753 provider.commit().unwrap();
754
755 let rocksdb = db.factory.rocksdb_provider();
757 let result = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
758 assert!(result.is_some(), "RocksDB should still have data after unwind");
759 let blocks_after: Vec<u64> = result.unwrap().iter().collect();
760 assert_eq!(blocks_after, (0..=5).collect::<Vec<_>>(), "Should only have blocks 0-5");
761 }
762
763 #[tokio::test]
765 async fn execute_incremental_sync() {
766 let db = TestStageDB::default();
767 setup_v2_account_data(&db, 0..=10);
768
769 let input = ExecInput { target: Some(5), ..Default::default() };
770 let mut stage = IndexAccountHistoryStage::default();
771 let provider = db.factory.database_provider_rw().unwrap();
772 let out = stage.execute(&provider, input).unwrap();
773 assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(5), done: true });
774 provider.commit().unwrap();
775
776 let rocksdb = db.factory.rocksdb_provider();
777 let result = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
778 assert!(result.is_some());
779 let blocks: Vec<u64> = result.unwrap().iter().collect();
780 assert_eq!(blocks, (0..=5).collect::<Vec<_>>());
781
782 let input = ExecInput { target: Some(10), checkpoint: Some(StageCheckpoint::new(5)) };
783 let provider = db.factory.database_provider_rw().unwrap();
784 let out = stage.execute(&provider, input).unwrap();
785 assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(10), done: true });
786 provider.commit().unwrap();
787
788 let rocksdb = db.factory.rocksdb_provider();
789 let result = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
790 assert!(result.is_some(), "RocksDB should have merged data");
791 let blocks: Vec<u64> = result.unwrap().iter().collect();
792 assert_eq!(blocks, (0..=10).collect::<Vec<_>>());
793 }
794 }
795}