1use super::{collect_history_indices, collect_storage_history_indices};
2use crate::{stages::utils::load_storage_history, StageCheckpoint, StageId};
3use reth_config::config::{EtlConfig, IndexHistoryConfig};
4use reth_db_api::{
5 models::{storage_sharded_key::StorageShardedKey, AddressStorageKey, BlockNumberAddress},
6 tables,
7 transaction::DbTxMut,
8 Tables,
9};
10use reth_provider::{
11 DBProvider, EitherWriter, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter,
12 RocksDBProviderFactory, StaticFileProviderFactory, StorageChangeSetReader,
13 StorageSettingsCache,
14};
15use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment};
16use reth_stages_api::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
17use std::fmt::Debug;
18use tracing::info;
19
20#[derive(Debug)]
24pub struct IndexStorageHistoryStage {
25 pub commit_threshold: u64,
28 pub prune_mode: Option<PruneMode>,
30 pub etl_config: EtlConfig,
32}
33
34impl IndexStorageHistoryStage {
35 pub const fn new(
37 config: IndexHistoryConfig,
38 etl_config: EtlConfig,
39 prune_mode: Option<PruneMode>,
40 ) -> Self {
41 Self { commit_threshold: config.commit_threshold, etl_config, prune_mode }
42 }
43}
44
45impl Default for IndexStorageHistoryStage {
46 fn default() -> Self {
47 Self { commit_threshold: 100_000, prune_mode: None, etl_config: EtlConfig::default() }
48 }
49}
50
51impl<Provider> Stage<Provider> for IndexStorageHistoryStage
52where
53 Provider: DBProvider<Tx: DbTxMut>
54 + HistoryWriter
55 + PruneCheckpointReader
56 + PruneCheckpointWriter
57 + StorageSettingsCache
58 + RocksDBProviderFactory
59 + StorageChangeSetReader
60 + StaticFileProviderFactory
61 + reth_provider::NodePrimitivesProvider,
62{
63 fn id(&self) -> StageId {
65 StageId::IndexStorageHistory
66 }
67
68 fn execute(
70 &mut self,
71 provider: &Provider,
72 mut input: ExecInput,
73 ) -> Result<ExecOutput, StageError> {
74 if let Some((target_prunable_block, prune_mode)) = self
75 .prune_mode
76 .map(|mode| {
77 mode.prune_target_block(
78 input.target(),
79 PruneSegment::StorageHistory,
80 PrunePurpose::User,
81 )
82 })
83 .transpose()?
84 .flatten() &&
85 target_prunable_block > input.checkpoint().block_number
86 {
87 input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
88
89 if provider.get_prune_checkpoint(PruneSegment::StorageHistory)?.is_none() {
92 provider.save_prune_checkpoint(
93 PruneSegment::StorageHistory,
94 PruneCheckpoint {
95 block_number: Some(target_prunable_block),
96 tx_number: None,
97 prune_mode,
98 },
99 )?;
100 }
101 }
102
103 if input.target_reached() {
104 return Ok(ExecOutput::done(input.checkpoint()))
105 }
106
107 let mut range = input.next_block_range();
108 let first_sync = input.checkpoint().block_number == 0;
109 let use_rocksdb = provider.cached_storage_settings().storage_v2;
110
111 if first_sync {
114 if use_rocksdb {
115 provider.rocksdb_provider().clear::<tables::StoragesHistory>()?;
120 } else {
121 provider.tx_ref().clear::<tables::StoragesHistory>()?;
122 }
123 range = 0..=*input.next_block_range().end();
124 }
125
126 info!(target: "sync::stages::index_storage_history::exec", ?first_sync, ?use_rocksdb, "Collecting indices");
127 let collector = if provider.cached_storage_settings().storage_v2 {
128 collect_storage_history_indices(provider, range.clone(), &self.etl_config)?
129 } else {
130 collect_history_indices::<_, tables::StorageChangeSets, tables::StoragesHistory, _>(
131 provider,
132 BlockNumberAddress::range(range.clone()),
133 |AddressStorageKey((address, storage_key)), highest_block_number| {
134 StorageShardedKey::new(address, storage_key, highest_block_number)
135 },
136 |(key, value)| (key.block_number(), AddressStorageKey((key.address(), value.key))),
137 &self.etl_config,
138 )?
139 };
140
141 info!(target: "sync::stages::index_storage_history::exec", "Loading indices into database");
142
143 provider.with_rocksdb_batch_auto_commit(|rocksdb_batch| {
144 let mut writer = EitherWriter::new_storages_history(provider, rocksdb_batch)?;
145 load_storage_history(collector, first_sync, &mut writer)
146 .map_err(|e| reth_provider::ProviderError::other(Box::new(e)))?;
147 Ok(((), writer.into_raw_rocksdb_batch()))
148 })?;
149
150 if use_rocksdb {
151 provider.commit_pending_rocksdb_batches()?;
152 provider.rocksdb_provider().flush(&[Tables::StoragesHistory.name()])?;
153 }
154
155 Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: true })
156 }
157
158 fn unwind(
160 &mut self,
161 provider: &Provider,
162 input: UnwindInput,
163 ) -> Result<UnwindOutput, StageError> {
164 let (range, unwind_progress, _) =
165 input.unwind_block_range_with_threshold(self.commit_threshold);
166
167 provider.unwind_storage_history_indices_range(range)?;
168
169 Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_progress) })
170 }
171}
172
173#[cfg(test)]
174mod tests {
175 use super::*;
176 use crate::test_utils::{
177 stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
178 TestStageDB, UnwindStageTestRunner,
179 };
180 use alloy_primitives::{address, b256, Address, BlockNumber, B256, U256};
181 use itertools::Itertools;
182 use reth_db_api::{
183 cursor::DbCursorRO,
184 models::{
185 sharded_key, storage_sharded_key::NUM_OF_INDICES_IN_SHARD, ShardedKey,
186 StoredBlockBodyIndices,
187 },
188 transaction::DbTx,
189 BlockNumberList,
190 };
191 use reth_primitives_traits::StorageEntry;
192 use reth_provider::{providers::StaticFileWriter, DatabaseProviderFactory};
193 use reth_testing_utils::generators::{
194 self, random_block_range, random_changeset_range, random_contract_account_range,
195 BlockRangeParams,
196 };
197 use std::collections::BTreeMap;
198
199 const ADDRESS: Address = address!("0x0000000000000000000000000000000000000001");
200 const STORAGE_KEY: B256 =
201 b256!("0x0000000000000000000000000000000000000000000000000000000000000001");
202
203 const LAST_BLOCK_IN_FULL_SHARD: BlockNumber = NUM_OF_INDICES_IN_SHARD as BlockNumber;
204 const MAX_BLOCK: BlockNumber = NUM_OF_INDICES_IN_SHARD as BlockNumber + 2;
205
206 const fn storage(key: B256) -> StorageEntry {
207 StorageEntry { key, value: U256::ZERO }
209 }
210
211 const fn block_number_address(block_number: u64) -> BlockNumberAddress {
212 BlockNumberAddress((block_number, ADDRESS))
213 }
214
215 const fn shard(shard_index: u64) -> StorageShardedKey {
217 StorageShardedKey {
218 address: ADDRESS,
219 sharded_key: ShardedKey { key: STORAGE_KEY, highest_block_number: shard_index },
220 }
221 }
222
223 fn list(list: &[u64]) -> BlockNumberList {
224 BlockNumberList::new(list.iter().copied()).unwrap()
225 }
226
227 fn cast(
228 table: Vec<(StorageShardedKey, BlockNumberList)>,
229 ) -> BTreeMap<StorageShardedKey, Vec<u64>> {
230 table
231 .into_iter()
232 .map(|(k, v)| {
233 let v = v.iter().collect();
234 (k, v)
235 })
236 .collect()
237 }
238
239 fn partial_setup(db: &TestStageDB) {
240 db.commit(|tx| {
242 for block in 0..=MAX_BLOCK {
243 tx.put::<tables::BlockBodyIndices>(
244 block,
245 StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
246 )?;
247 tx.put::<tables::StorageChangeSets>(
249 block_number_address(block),
250 storage(STORAGE_KEY),
251 )?;
252 }
253 Ok(())
254 })
255 .unwrap()
256 }
257
258 fn run(db: &TestStageDB, run_to: u64, input_checkpoint: Option<BlockNumber>) {
259 let input = ExecInput {
260 target: Some(run_to),
261 checkpoint: input_checkpoint
262 .map(|block_number| StageCheckpoint { block_number, stage_checkpoint: None }),
263 };
264 let mut stage = IndexStorageHistoryStage::default();
265 let provider = db.factory.database_provider_rw().unwrap();
266 let out = stage.execute(&provider, input).unwrap();
267 assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(run_to), done: true });
268 provider.commit().unwrap();
269 }
270
271 fn unwind(db: &TestStageDB, unwind_from: u64, unwind_to: u64) {
272 let input = UnwindInput {
273 checkpoint: StageCheckpoint::new(unwind_from),
274 unwind_to,
275 ..Default::default()
276 };
277 let mut stage = IndexStorageHistoryStage::default();
278 let provider = db.factory.database_provider_rw().unwrap();
279 let out = stage.unwind(&provider, input).unwrap();
280 assert_eq!(out, UnwindOutput { checkpoint: StageCheckpoint::new(unwind_to) });
281 provider.commit().unwrap();
282 }
283
284 #[tokio::test]
285 async fn insert_index_to_genesis() {
286 let db = TestStageDB::default();
288
289 partial_setup(&db);
291
292 run(&db, 3, None);
294
295 let table = cast(db.table::<tables::StoragesHistory>().unwrap());
297 assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![0, 1, 2, 3])]));
298
299 unwind(&db, 5, 0);
301
302 let table = cast(db.table::<tables::StoragesHistory>().unwrap());
304 assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![0])]));
305 }
306
307 #[tokio::test]
308 async fn insert_index_to_not_empty_shard() {
309 let db = TestStageDB::default();
311
312 partial_setup(&db);
314 db.commit(|tx| {
315 tx.put::<tables::StoragesHistory>(shard(u64::MAX), list(&[1, 2, 3])).unwrap();
316 Ok(())
317 })
318 .unwrap();
319
320 run(&db, 5, Some(3));
322
323 let table = cast(db.table::<tables::StoragesHistory>().unwrap());
325 assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![1, 2, 3, 4, 5])]));
326
327 unwind(&db, 5, 3);
329
330 let table = cast(db.table::<tables::StoragesHistory>().unwrap());
332 assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![1, 2, 3])]));
333 }
334
335 #[tokio::test]
336 async fn insert_index_to_full_shard() {
337 let db = TestStageDB::default();
339 let full_list = (1..=LAST_BLOCK_IN_FULL_SHARD).collect::<Vec<_>>();
341
342 partial_setup(&db);
344 db.commit(|tx| {
345 tx.put::<tables::StoragesHistory>(shard(u64::MAX), list(&full_list)).unwrap();
346 Ok(())
347 })
348 .unwrap();
349
350 run(&db, LAST_BLOCK_IN_FULL_SHARD + 2, Some(LAST_BLOCK_IN_FULL_SHARD));
352
353 let table = cast(db.table::<tables::StoragesHistory>().unwrap());
355 assert_eq!(
356 table,
357 BTreeMap::from([
358 (shard(LAST_BLOCK_IN_FULL_SHARD), full_list.clone()),
359 (shard(u64::MAX), vec![LAST_BLOCK_IN_FULL_SHARD + 1, LAST_BLOCK_IN_FULL_SHARD + 2])
360 ])
361 );
362
363 unwind(&db, LAST_BLOCK_IN_FULL_SHARD + 2, LAST_BLOCK_IN_FULL_SHARD);
365
366 let table = cast(db.table::<tables::StoragesHistory>().unwrap());
368 assert_eq!(table, BTreeMap::from([(shard(u64::MAX), full_list)]));
369 }
370
371 #[tokio::test]
372 async fn insert_index_to_fill_shard() {
373 let db = TestStageDB::default();
375 let mut almost_full_list = (1..=LAST_BLOCK_IN_FULL_SHARD - 2).collect::<Vec<_>>();
376
377 partial_setup(&db);
379 db.commit(|tx| {
380 tx.put::<tables::StoragesHistory>(shard(u64::MAX), list(&almost_full_list)).unwrap();
381 Ok(())
382 })
383 .unwrap();
384
385 run(&db, LAST_BLOCK_IN_FULL_SHARD, Some(LAST_BLOCK_IN_FULL_SHARD - 2));
387
388 almost_full_list.push(LAST_BLOCK_IN_FULL_SHARD - 1);
390 almost_full_list.push(LAST_BLOCK_IN_FULL_SHARD);
391 let table = cast(db.table::<tables::StoragesHistory>().unwrap());
392 assert_eq!(table, BTreeMap::from([(shard(u64::MAX), almost_full_list.clone())]));
393
394 unwind(&db, LAST_BLOCK_IN_FULL_SHARD, LAST_BLOCK_IN_FULL_SHARD - 2);
396
397 almost_full_list.pop();
399 almost_full_list.pop();
400 let table = cast(db.table::<tables::StoragesHistory>().unwrap());
401 assert_eq!(table, BTreeMap::from([(shard(u64::MAX), almost_full_list)]));
402
403 }
405
406 #[tokio::test]
407 async fn insert_index_second_half_shard() {
408 let db = TestStageDB::default();
410 let mut almost_full_list = (1..=LAST_BLOCK_IN_FULL_SHARD - 1).collect::<Vec<_>>();
411
412 partial_setup(&db);
414 db.commit(|tx| {
415 tx.put::<tables::StoragesHistory>(shard(u64::MAX), list(&almost_full_list)).unwrap();
416 Ok(())
417 })
418 .unwrap();
419
420 run(&db, LAST_BLOCK_IN_FULL_SHARD + 1, Some(LAST_BLOCK_IN_FULL_SHARD - 1));
422
423 almost_full_list.push(LAST_BLOCK_IN_FULL_SHARD);
425 let table = cast(db.table::<tables::StoragesHistory>().unwrap());
426 assert_eq!(
427 table,
428 BTreeMap::from([
429 (shard(LAST_BLOCK_IN_FULL_SHARD), almost_full_list.clone()),
430 (shard(u64::MAX), vec![LAST_BLOCK_IN_FULL_SHARD + 1])
431 ])
432 );
433
434 unwind(&db, LAST_BLOCK_IN_FULL_SHARD, LAST_BLOCK_IN_FULL_SHARD - 1);
436
437 almost_full_list.pop();
439 let table = cast(db.table::<tables::StoragesHistory>().unwrap());
440 assert_eq!(table, BTreeMap::from([(shard(u64::MAX), almost_full_list)]));
441 }
442
443 #[tokio::test]
444 async fn insert_index_to_third_shard() {
445 let db = TestStageDB::default();
447 let full_list = (1..=LAST_BLOCK_IN_FULL_SHARD).collect::<Vec<_>>();
448
449 partial_setup(&db);
451 db.commit(|tx| {
452 tx.put::<tables::StoragesHistory>(shard(1), list(&full_list)).unwrap();
453 tx.put::<tables::StoragesHistory>(shard(2), list(&full_list)).unwrap();
454 tx.put::<tables::StoragesHistory>(
455 shard(u64::MAX),
456 list(&[LAST_BLOCK_IN_FULL_SHARD + 1]),
457 )
458 .unwrap();
459 Ok(())
460 })
461 .unwrap();
462
463 run(&db, LAST_BLOCK_IN_FULL_SHARD + 2, Some(LAST_BLOCK_IN_FULL_SHARD + 1));
464
465 let table = cast(db.table::<tables::StoragesHistory>().unwrap());
467 assert_eq!(
468 table,
469 BTreeMap::from([
470 (shard(1), full_list.clone()),
471 (shard(2), full_list.clone()),
472 (shard(u64::MAX), vec![LAST_BLOCK_IN_FULL_SHARD + 1, LAST_BLOCK_IN_FULL_SHARD + 2])
473 ])
474 );
475
476 unwind(&db, LAST_BLOCK_IN_FULL_SHARD + 2, LAST_BLOCK_IN_FULL_SHARD + 1);
478
479 let table = cast(db.table::<tables::StoragesHistory>().unwrap());
481 assert_eq!(
482 table,
483 BTreeMap::from([
484 (shard(1), full_list.clone()),
485 (shard(2), full_list),
486 (shard(u64::MAX), vec![LAST_BLOCK_IN_FULL_SHARD + 1])
487 ])
488 );
489 }
490
491 #[tokio::test]
492 async fn insert_index_with_prune_mode() {
493 let db = TestStageDB::default();
495
496 db.commit(|tx| {
498 tx.put::<tables::BlockBodyIndices>(
500 0,
501 StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
502 )
503 .unwrap();
504
505 tx.put::<tables::BlockBodyIndices>(
506 100,
507 StoredBlockBodyIndices { tx_count: 5, ..Default::default() },
508 )
509 .unwrap();
510
511 tx.put::<tables::StorageChangeSets>(block_number_address(20), storage(STORAGE_KEY))
513 .unwrap();
514 tx.put::<tables::StorageChangeSets>(block_number_address(36), storage(STORAGE_KEY))
515 .unwrap();
516 tx.put::<tables::StorageChangeSets>(block_number_address(100), storage(STORAGE_KEY))
517 .unwrap();
518 Ok(())
519 })
520 .unwrap();
521
522 let input = ExecInput { target: Some(20000), ..Default::default() };
524 let mut stage = IndexStorageHistoryStage {
525 prune_mode: Some(PruneMode::Before(36)),
526 ..Default::default()
527 };
528 let provider = db.factory.database_provider_rw().unwrap();
529 let out = stage.execute(&provider, input).unwrap();
530 assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(20000), done: true });
531 provider.commit().unwrap();
532
533 let table = cast(db.table::<tables::StoragesHistory>().unwrap());
535 assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![36, 100])]));
536
537 unwind(&db, 20000, 0);
539
540 let table = db.table::<tables::StoragesHistory>().unwrap();
542 assert!(table.is_empty());
543 }
544
545 stage_test_suite_ext!(IndexStorageHistoryTestRunner, index_storage_history);
546
547 struct IndexStorageHistoryTestRunner {
548 pub(crate) db: TestStageDB,
549 commit_threshold: u64,
550 prune_mode: Option<PruneMode>,
551 }
552
553 impl Default for IndexStorageHistoryTestRunner {
554 fn default() -> Self {
555 Self { db: TestStageDB::default(), commit_threshold: 1000, prune_mode: None }
556 }
557 }
558
559 impl StageTestRunner for IndexStorageHistoryTestRunner {
560 type S = IndexStorageHistoryStage;
561
562 fn db(&self) -> &TestStageDB {
563 &self.db
564 }
565
566 fn stage(&self) -> Self::S {
567 Self::S {
568 commit_threshold: self.commit_threshold,
569 prune_mode: self.prune_mode,
570 etl_config: EtlConfig::default(),
571 }
572 }
573 }
574
575 impl ExecuteStageTestRunner for IndexStorageHistoryTestRunner {
576 type Seed = ();
577
578 fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
579 let stage_process = input.checkpoint().block_number;
580 let start = stage_process + 1;
581 let end = input.target();
582 let mut rng = generators::rng();
583
584 let num_of_accounts = 31;
585 let accounts = random_contract_account_range(&mut rng, &mut (0..num_of_accounts))
586 .into_iter()
587 .collect::<BTreeMap<_, _>>();
588
589 let blocks = random_block_range(
590 &mut rng,
591 start..=end,
592 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..3, ..Default::default() },
593 );
594
595 let (changesets, _) = random_changeset_range(
596 &mut rng,
597 blocks.iter(),
598 accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
599 0..3,
600 0..u64::MAX,
601 );
602
603 self.db.insert_changesets(changesets, Some(start))?;
605
606 Ok(())
607 }
608
609 fn validate_execution(
610 &self,
611 input: ExecInput,
612 output: Option<ExecOutput>,
613 ) -> Result<(), TestRunnerError> {
614 if let Some(output) = output {
615 let start_block = input.next_block();
616 let end_block = output.checkpoint.block_number;
617 if start_block > end_block {
618 return Ok(())
619 }
620
621 assert_eq!(
622 output,
623 ExecOutput { checkpoint: StageCheckpoint::new(input.target()), done: true }
624 );
625
626 let provider = self.db.factory.provider()?;
627 let mut changeset_cursor =
628 provider.tx_ref().cursor_read::<tables::StorageChangeSets>()?;
629
630 let storage_transitions = changeset_cursor
631 .walk_range(BlockNumberAddress::range(start_block..=end_block))?
632 .try_fold(
633 BTreeMap::new(),
634 |mut storages: BTreeMap<(Address, B256), Vec<u64>>,
635 entry|
636 -> Result<_, TestRunnerError> {
637 let (index, storage) = entry?;
638 storages
639 .entry((index.address(), storage.key))
640 .or_default()
641 .push(index.block_number());
642 Ok(storages)
643 },
644 )?;
645
646 let mut result = BTreeMap::new();
647 for (partial_key, indices) in storage_transitions {
648 let mut chunks = indices
650 .iter()
651 .chunks(sharded_key::NUM_OF_INDICES_IN_SHARD)
652 .into_iter()
653 .map(|chunks| chunks.copied().collect::<Vec<u64>>())
654 .collect::<Vec<Vec<_>>>();
655 let last_chunk = chunks.pop();
656
657 for list in chunks {
658 result.insert(
659 StorageShardedKey::new(
660 partial_key.0,
661 partial_key.1,
662 *list.last().expect("Chuck does not return empty list")
663 as BlockNumber,
664 ),
665 list,
666 );
667 }
668
669 if let Some(last_list) = last_chunk {
670 result.insert(
671 StorageShardedKey::new(partial_key.0, partial_key.1, u64::MAX),
672 last_list,
673 );
674 };
675 }
676
677 let table = cast(self.db.table::<tables::StoragesHistory>().unwrap());
678 assert_eq!(table, result);
679 }
680 Ok(())
681 }
682 }
683
684 impl UnwindStageTestRunner for IndexStorageHistoryTestRunner {
685 fn validate_unwind(&self, _input: UnwindInput) -> Result<(), TestRunnerError> {
686 let table = self.db.table::<tables::StoragesHistory>().unwrap();
687 assert!(table.is_empty());
688 Ok(())
689 }
690 }
691
692 mod rocksdb_tests {
693 use super::*;
694 use reth_db_api::models::StorageBeforeTx;
695 use reth_provider::{providers::StaticFileWriter, RocksDBProviderFactory};
696 use reth_static_file_types::StaticFileSegment;
697 use reth_storage_api::StorageSettings;
698
699 fn setup_v2_storage_data(db: &TestStageDB, block_range: std::ops::RangeInclusive<u64>) {
702 db.factory.set_storage_settings_cache(StorageSettings::v2());
703
704 db.commit(|tx| {
705 for block in block_range.clone() {
706 tx.put::<tables::BlockBodyIndices>(
707 block,
708 StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
709 )?;
710 }
711 Ok(())
712 })
713 .unwrap();
714
715 let static_file_provider = db.factory.static_file_provider();
716 let mut writer =
717 static_file_provider.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
718 for block in block_range {
719 writer
720 .append_storage_changeset(
721 vec![StorageBeforeTx {
722 address: ADDRESS,
723 key: STORAGE_KEY,
724 value: U256::ZERO,
725 }],
726 block,
727 )
728 .unwrap();
729 }
730 writer.commit().unwrap();
731 }
732
733 #[tokio::test]
736 async fn execute_writes_to_rocksdb_when_enabled() {
737 let db = TestStageDB::default();
738 setup_v2_storage_data(&db, 0..=10);
739
740 let input = ExecInput { target: Some(10), ..Default::default() };
741 let mut stage = IndexStorageHistoryStage::default();
742 let provider = db.factory.database_provider_rw().unwrap();
743 let out = stage.execute(&provider, input).unwrap();
744 assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(10), done: true });
745 provider.commit().unwrap();
746
747 let mdbx_table = db.table::<tables::StoragesHistory>().unwrap();
748 assert!(
749 mdbx_table.is_empty(),
750 "MDBX StoragesHistory should be empty when RocksDB is enabled"
751 );
752
753 let rocksdb = db.factory.rocksdb_provider();
754 let result = rocksdb.get::<tables::StoragesHistory>(shard(u64::MAX)).unwrap();
755 assert!(result.is_some(), "RocksDB should contain storage history");
756
757 let block_list = result.unwrap();
758 let blocks: Vec<u64> = block_list.iter().collect();
759 assert_eq!(blocks, (0..=10).collect::<Vec<_>>());
760 }
761
762 #[tokio::test]
764 async fn unwind_works_when_rocksdb_enabled() {
765 let db = TestStageDB::default();
766 setup_v2_storage_data(&db, 0..=10);
767
768 let input = ExecInput { target: Some(10), ..Default::default() };
769 let mut stage = IndexStorageHistoryStage::default();
770 let provider = db.factory.database_provider_rw().unwrap();
771 let out = stage.execute(&provider, input).unwrap();
772 assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(10), done: true });
773 provider.commit().unwrap();
774
775 let rocksdb = db.factory.rocksdb_provider();
776 let result = rocksdb.get::<tables::StoragesHistory>(shard(u64::MAX)).unwrap();
777 assert!(result.is_some(), "RocksDB should have data before unwind");
778 let blocks_before: Vec<u64> = result.unwrap().iter().collect();
779 assert_eq!(blocks_before, (0..=10).collect::<Vec<_>>());
780
781 let unwind_input =
782 UnwindInput { checkpoint: StageCheckpoint::new(10), unwind_to: 5, bad_block: None };
783 let provider = db.factory.database_provider_rw().unwrap();
784 let out = stage.unwind(&provider, unwind_input).unwrap();
785 assert_eq!(out, UnwindOutput { checkpoint: StageCheckpoint::new(5) });
786 provider.commit().unwrap();
787
788 let rocksdb = db.factory.rocksdb_provider();
789 let result = rocksdb.get::<tables::StoragesHistory>(shard(u64::MAX)).unwrap();
790 assert!(result.is_some(), "RocksDB should still have data after partial unwind");
791 let blocks_after: Vec<u64> = result.unwrap().iter().collect();
792 assert_eq!(
793 blocks_after,
794 (0..=5).collect::<Vec<_>>(),
795 "Should only have blocks 0-5 after unwind to block 5"
796 );
797 }
798
799 #[tokio::test]
801 async fn unwind_to_zero_keeps_block_zero() {
802 let db = TestStageDB::default();
803 setup_v2_storage_data(&db, 0..=5);
804
805 let input = ExecInput { target: Some(5), ..Default::default() };
806 let mut stage = IndexStorageHistoryStage::default();
807 let provider = db.factory.database_provider_rw().unwrap();
808 let out = stage.execute(&provider, input).unwrap();
809 assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(5), done: true });
810 provider.commit().unwrap();
811
812 let rocksdb = db.factory.rocksdb_provider();
813 let result = rocksdb.get::<tables::StoragesHistory>(shard(u64::MAX)).unwrap();
814 assert!(result.is_some(), "RocksDB should have data before unwind");
815
816 let unwind_input =
817 UnwindInput { checkpoint: StageCheckpoint::new(5), unwind_to: 0, bad_block: None };
818 let provider = db.factory.database_provider_rw().unwrap();
819 let out = stage.unwind(&provider, unwind_input).unwrap();
820 assert_eq!(out, UnwindOutput { checkpoint: StageCheckpoint::new(0) });
821 provider.commit().unwrap();
822
823 let rocksdb = db.factory.rocksdb_provider();
824 let result = rocksdb.get::<tables::StoragesHistory>(shard(u64::MAX)).unwrap();
825 assert!(result.is_some(), "RocksDB should still have block 0 history");
826 let blocks_after: Vec<u64> = result.unwrap().iter().collect();
827 assert_eq!(blocks_after, vec![0], "Should only have block 0 after unwinding to 0");
828 }
829
830 #[tokio::test]
832 async fn execute_incremental_sync() {
833 let db = TestStageDB::default();
834 setup_v2_storage_data(&db, 0..=10);
835
836 let input = ExecInput { target: Some(5), ..Default::default() };
837 let mut stage = IndexStorageHistoryStage::default();
838 let provider = db.factory.database_provider_rw().unwrap();
839 let out = stage.execute(&provider, input).unwrap();
840 assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(5), done: true });
841 provider.commit().unwrap();
842
843 let rocksdb = db.factory.rocksdb_provider();
844 let result = rocksdb.get::<tables::StoragesHistory>(shard(u64::MAX)).unwrap();
845 assert!(result.is_some());
846 let blocks: Vec<u64> = result.unwrap().iter().collect();
847 assert_eq!(blocks, (0..=5).collect::<Vec<_>>());
848
849 let input = ExecInput { target: Some(10), checkpoint: Some(StageCheckpoint::new(5)) };
850 let provider = db.factory.database_provider_rw().unwrap();
851 let out = stage.execute(&provider, input).unwrap();
852 assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(10), done: true });
853 provider.commit().unwrap();
854
855 let rocksdb = db.factory.rocksdb_provider();
856 let result = rocksdb.get::<tables::StoragesHistory>(shard(u64::MAX)).unwrap();
857 assert!(result.is_some(), "RocksDB should have merged data");
858 let blocks: Vec<u64> = result.unwrap().iter().collect();
859 assert_eq!(blocks, (0..=10).collect::<Vec<_>>());
860 }
861
862 #[tokio::test]
864 async fn unwind_multi_shard() {
865 use reth_db_api::models::sharded_key::NUM_OF_INDICES_IN_SHARD;
866
867 let db = TestStageDB::default();
868 let num_blocks = (NUM_OF_INDICES_IN_SHARD * 2 + 100) as u64;
869 setup_v2_storage_data(&db, 0..=num_blocks - 1);
870
871 let input = ExecInput { target: Some(num_blocks - 1), ..Default::default() };
872 let mut stage = IndexStorageHistoryStage::default();
873 let provider = db.factory.database_provider_rw().unwrap();
874 let out = stage.execute(&provider, input).unwrap();
875 assert_eq!(
876 out,
877 ExecOutput { checkpoint: StageCheckpoint::new(num_blocks - 1), done: true }
878 );
879 provider.commit().unwrap();
880
881 let rocksdb = db.factory.rocksdb_provider();
882 let shards = rocksdb.storage_history_shards(ADDRESS, STORAGE_KEY).unwrap();
883 assert!(shards.len() >= 2, "Should have at least 2 shards for {} blocks", num_blocks);
884
885 let unwind_to = NUM_OF_INDICES_IN_SHARD as u64 + 50;
886 let unwind_input = UnwindInput {
887 checkpoint: StageCheckpoint::new(num_blocks - 1),
888 unwind_to,
889 bad_block: None,
890 };
891 let provider = db.factory.database_provider_rw().unwrap();
892 let out = stage.unwind(&provider, unwind_input).unwrap();
893 assert_eq!(out, UnwindOutput { checkpoint: StageCheckpoint::new(unwind_to) });
894 provider.commit().unwrap();
895
896 let rocksdb = db.factory.rocksdb_provider();
897 let shards_after = rocksdb.storage_history_shards(ADDRESS, STORAGE_KEY).unwrap();
898 assert!(!shards_after.is_empty(), "Should still have shards after unwind");
899
900 let all_blocks: Vec<u64> =
901 shards_after.iter().flat_map(|(_, list)| list.iter()).collect();
902 assert_eq!(
903 all_blocks,
904 (0..=unwind_to).collect::<Vec<_>>(),
905 "Should only have blocks 0 to {} after unwind",
906 unwind_to
907 );
908 }
909 }
910}