1use crate::{
2 db_ext::DbTxPruneExt,
3 segments::{
4 user::history::{finalize_history_prune, HistoryPruneResult},
5 PruneInput, Segment,
6 },
7 PrunerError,
8};
9use alloy_primitives::{Address, BlockNumber, B256};
10use reth_db_api::{
11 models::{storage_sharded_key::StorageShardedKey, BlockNumberAddress},
12 tables,
13 transaction::DbTxMut,
14};
15use reth_provider::{DBProvider, EitherWriter, RocksDBProviderFactory, StaticFileProviderFactory};
16use reth_prune_types::{
17 PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
18};
19use reth_static_file_types::StaticFileSegment;
20use reth_storage_api::{StorageChangeSetReader, StorageSettingsCache};
21use rustc_hash::FxHashMap;
22use tracing::{instrument, trace};
23
24const STORAGE_HISTORY_TABLES_TO_PRUNE: usize = 2;
29
30#[derive(Debug)]
31pub struct StorageHistory {
32 mode: PruneMode,
33}
34
35impl StorageHistory {
36 pub const fn new(mode: PruneMode) -> Self {
37 Self { mode }
38 }
39}
40
41impl<Provider> Segment<Provider> for StorageHistory
42where
43 Provider: DBProvider<Tx: DbTxMut>
44 + StaticFileProviderFactory
45 + StorageChangeSetReader
46 + StorageSettingsCache
47 + RocksDBProviderFactory,
48{
49 fn segment(&self) -> PruneSegment {
50 PruneSegment::StorageHistory
51 }
52
53 fn mode(&self) -> Option<PruneMode> {
54 Some(self.mode)
55 }
56
57 fn purpose(&self) -> PrunePurpose {
58 PrunePurpose::User
59 }
60
61 #[instrument(
62 name = "StorageHistory::prune",
63 target = "pruner",
64 skip(self, provider),
65 ret(level = "trace")
66 )]
67 fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
68 let range = match input.get_next_block_range() {
69 Some(range) => range,
70 None => {
71 trace!(target: "pruner", "No storage history to prune");
72 return Ok(SegmentOutput::done())
73 }
74 };
75 let range_end = *range.end();
76
77 #[cfg(all(unix, feature = "rocksdb"))]
79 if provider.cached_storage_settings().storage_v2 {
80 return self.prune_rocksdb(provider, input, range, range_end);
81 }
82
83 if EitherWriter::storage_changesets_destination(provider).is_static_file() {
85 self.prune_static_files(provider, input, range, range_end)
86 } else {
87 self.prune_database(provider, input, range, range_end)
88 }
89 }
90}
91
92impl StorageHistory {
93 fn prune_static_files<Provider>(
95 &self,
96 provider: &Provider,
97 input: PruneInput,
98 range: std::ops::RangeInclusive<BlockNumber>,
99 range_end: BlockNumber,
100 ) -> Result<SegmentOutput, PrunerError>
101 where
102 Provider: DBProvider<Tx: DbTxMut> + StaticFileProviderFactory,
103 {
104 let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() {
105 input.limiter.set_deleted_entries_limit(limit / STORAGE_HISTORY_TABLES_TO_PRUNE)
106 } else {
107 input.limiter
108 };
109
110 if limiter.is_limit_reached() {
113 return Ok(SegmentOutput::not_done(
114 limiter.interrupt_reason(),
115 input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
116 ))
117 }
118
119 let mut highest_deleted_storages = FxHashMap::default();
125 let mut last_changeset_pruned_block = None;
126 let mut pruned_changesets = 0;
127 let mut done = true;
128
129 let walker = provider.static_file_provider().walk_storage_changeset_range(range);
130 for result in walker {
131 if limiter.is_limit_reached() {
132 done = false;
133 break;
134 }
135 let (block_address, entry) = result?;
136 let block_number = block_address.block_number();
137 let address = block_address.address();
138 highest_deleted_storages.insert((address, entry.key.as_b256()), block_number);
139 last_changeset_pruned_block = Some(block_number);
140 pruned_changesets += 1;
141 limiter.increment_deleted_entries_count();
142 }
143
144 if done && let Some(last_block) = last_changeset_pruned_block {
146 provider
147 .static_file_provider()
148 .delete_segment_below_block(StaticFileSegment::StorageChangeSets, last_block + 1)?;
149 }
150 trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned storage history (changesets from static files)");
151
152 let result = HistoryPruneResult {
153 highest_deleted: highest_deleted_storages,
154 last_pruned_block: last_changeset_pruned_block,
155 pruned_count: pruned_changesets,
156 done,
157 };
158 finalize_history_prune::<_, tables::StoragesHistory, (Address, B256), _>(
159 provider,
160 result,
161 range_end,
162 &limiter,
163 |(address, storage_key), block_number| {
164 StorageShardedKey::new(address, storage_key, block_number)
165 },
166 |a, b| a.address == b.address && a.sharded_key.key == b.sharded_key.key,
167 )
168 .map_err(Into::into)
169 }
170
171 fn prune_database<Provider>(
172 &self,
173 provider: &Provider,
174 input: PruneInput,
175 range: std::ops::RangeInclusive<BlockNumber>,
176 range_end: BlockNumber,
177 ) -> Result<SegmentOutput, PrunerError>
178 where
179 Provider: DBProvider<Tx: DbTxMut>,
180 {
181 let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() {
182 input.limiter.set_deleted_entries_limit(limit / STORAGE_HISTORY_TABLES_TO_PRUNE)
183 } else {
184 input.limiter
185 };
186
187 if limiter.is_limit_reached() {
188 return Ok(SegmentOutput::not_done(
189 limiter.interrupt_reason(),
190 input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
191 ))
192 }
193
194 let mut last_changeset_pruned_block = None;
203 let mut highest_deleted_storages = FxHashMap::default();
204 let (pruned_changesets, done) =
205 provider.tx_ref().prune_table_with_range::<tables::StorageChangeSets>(
206 BlockNumberAddress::range(range),
207 &mut limiter,
208 |_| false,
209 |(BlockNumberAddress((block_number, address)), entry)| {
210 highest_deleted_storages.insert((address, entry.key), block_number);
211 last_changeset_pruned_block = Some(block_number);
212 },
213 )?;
214 trace!(target: "pruner", deleted = %pruned_changesets, %done, "Pruned storage history (changesets)");
215
216 let result = HistoryPruneResult {
217 highest_deleted: highest_deleted_storages,
218 last_pruned_block: last_changeset_pruned_block,
219 pruned_count: pruned_changesets,
220 done,
221 };
222 finalize_history_prune::<_, tables::StoragesHistory, (Address, B256), _>(
223 provider,
224 result,
225 range_end,
226 &limiter,
227 |(address, storage_key), block_number| {
228 StorageShardedKey::new(address, storage_key, block_number)
229 },
230 |a, b| a.address == b.address && a.sharded_key.key == b.sharded_key.key,
231 )
232 .map_err(Into::into)
233 }
234
235 #[cfg(all(unix, feature = "rocksdb"))]
240 fn prune_rocksdb<Provider>(
241 &self,
242 provider: &Provider,
243 input: PruneInput,
244 range: std::ops::RangeInclusive<BlockNumber>,
245 range_end: BlockNumber,
246 ) -> Result<SegmentOutput, PrunerError>
247 where
248 Provider: DBProvider + StaticFileProviderFactory + RocksDBProviderFactory,
249 {
250 let mut limiter = input.limiter;
251
252 if limiter.is_limit_reached() {
253 return Ok(SegmentOutput::not_done(
254 limiter.interrupt_reason(),
255 input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
256 ))
257 }
258
259 let mut highest_deleted_storages: FxHashMap<_, _> = FxHashMap::default();
260 let mut last_changeset_pruned_block = None;
261 let mut changesets_processed = 0usize;
262 let mut done = true;
263
264 let walker = provider.static_file_provider().walk_storage_changeset_range(range);
268 for result in walker {
269 if limiter.is_limit_reached() {
270 done = false;
271 break;
272 }
273 let (block_address, entry) = result?;
274 let block_number = block_address.block_number();
275 let address = block_address.address();
276 highest_deleted_storages.insert((address, entry.key.as_b256()), block_number);
277 last_changeset_pruned_block = Some(block_number);
278 changesets_processed += 1;
279 limiter.increment_deleted_entries_count();
280 }
281
282 trace!(target: "pruner", processed = %changesets_processed, %done, "Scanned storage changesets from static files");
283
284 let last_changeset_pruned_block = last_changeset_pruned_block
285 .map(|block_number| if done { block_number } else { block_number.saturating_sub(1) })
286 .unwrap_or(range_end);
287
288 let mut deleted_shards = 0usize;
290 let mut updated_shards = 0usize;
291
292 let mut sorted_storages: Vec<_> = highest_deleted_storages.into_iter().collect();
294 sorted_storages.sort_unstable_by_key(|((addr, key), _)| (*addr, *key));
295
296 provider.with_rocksdb_batch(|mut batch| {
297 let targets: Vec<_> = sorted_storages
298 .iter()
299 .map(|((addr, key), highest)| {
300 ((*addr, *key), (*highest).min(last_changeset_pruned_block))
301 })
302 .collect();
303
304 let outcomes = batch.prune_storage_history_batch(&targets)?;
305 deleted_shards = outcomes.deleted;
306 updated_shards = outcomes.updated;
307
308 Ok(((), Some(batch.into_inner())))
309 })?;
310
311 trace!(target: "pruner", deleted = deleted_shards, updated = updated_shards, %done, "Pruned storage history (RocksDB indices)");
312
313 if done {
319 provider.static_file_provider().delete_segment_below_block(
320 StaticFileSegment::StorageChangeSets,
321 last_changeset_pruned_block + 1,
322 )?;
323 }
324
325 let progress = limiter.progress(done);
326
327 Ok(SegmentOutput {
328 progress,
329 pruned: changesets_processed + deleted_shards + updated_shards,
330 checkpoint: Some(SegmentOutputCheckpoint {
331 block_number: Some(last_changeset_pruned_block),
332 tx_number: None,
333 }),
334 })
335 }
336}
337
338#[cfg(test)]
339mod tests {
340 use super::STORAGE_HISTORY_TABLES_TO_PRUNE;
341 use crate::segments::{PruneInput, PruneLimiter, Segment, SegmentOutput, StorageHistory};
342 use alloy_primitives::{BlockNumber, B256};
343 use assert_matches::assert_matches;
344 use reth_db_api::{models::StorageSettings, tables, BlockNumberList};
345 use reth_provider::{DBProvider, DatabaseProviderFactory, PruneCheckpointReader};
346 use reth_prune_types::{
347 PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
348 };
349 use reth_stages::test_utils::{StorageKind, TestStageDB};
350 use reth_storage_api::StorageSettingsCache;
351 use reth_testing_utils::generators::{
352 self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
353 };
354 use std::{collections::BTreeMap, ops::AddAssign};
355
356 #[test]
357 fn prune_legacy() {
358 let db = TestStageDB::default();
359 let mut rng = generators::rng();
360
361 let blocks = random_block_range(
362 &mut rng,
363 0..=5000,
364 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
365 );
366 db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
367
368 let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
369
370 let (changesets, _) = random_changeset_range(
371 &mut rng,
372 blocks.iter(),
373 accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
374 1..2,
375 1..2,
376 );
377 db.insert_changesets(changesets.clone(), None).expect("insert changesets");
378 db.insert_history(changesets.clone(), None).expect("insert history");
379
380 let storage_occurrences = db.table::<tables::StoragesHistory>().unwrap().into_iter().fold(
381 BTreeMap::<_, usize>::new(),
382 |mut map, (key, _)| {
383 map.entry((key.address, key.sharded_key.key)).or_default().add_assign(1);
384 map
385 },
386 );
387 assert!(storage_occurrences.into_iter().any(|(_, occurrences)| occurrences > 1));
388
389 assert_eq!(
390 db.table::<tables::StorageChangeSets>().unwrap().len(),
391 changesets.iter().flatten().flat_map(|(_, _, entries)| entries).count()
392 );
393
394 let original_shards = db.table::<tables::StoragesHistory>().unwrap();
395
396 let test_prune = |to_block: BlockNumber,
397 run: usize,
398 expected_result: (PruneProgress, usize)| {
399 let prune_mode = PruneMode::Before(to_block);
400 let deleted_entries_limit = 1000;
401 let mut limiter =
402 PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
403 let input = PruneInput {
404 previous_checkpoint: db
405 .factory
406 .provider()
407 .unwrap()
408 .get_prune_checkpoint(PruneSegment::StorageHistory)
409 .unwrap(),
410 to_block,
411 limiter: limiter.clone(),
412 };
413 let segment = StorageHistory::new(prune_mode);
414
415 let provider = db.factory.database_provider_rw().unwrap();
416 provider.set_storage_settings_cache(StorageSettings::v1());
417 let result = segment.prune(&provider, input).unwrap();
418 limiter.increment_deleted_entries_count_by(result.pruned);
419
420 assert_matches!(
421 result,
422 SegmentOutput {progress, pruned, checkpoint: Some(_)}
423 if (progress, pruned) == expected_result
424 );
425
426 segment
427 .save_checkpoint(
428 &provider,
429 result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
430 )
431 .unwrap();
432 provider.commit().expect("commit");
433
434 let changesets = changesets
435 .iter()
436 .enumerate()
437 .flat_map(|(block_number, changeset)| {
438 changeset.iter().flat_map(move |(address, _, entries)| {
439 entries.iter().map(move |entry| (block_number, address, entry))
440 })
441 })
442 .collect::<Vec<_>>();
443
444 #[expect(clippy::skip_while_next)]
445 let pruned = changesets
446 .iter()
447 .enumerate()
448 .skip_while(|(i, (block_number, _, _))| {
449 *i < deleted_entries_limit / STORAGE_HISTORY_TABLES_TO_PRUNE * run &&
450 *block_number <= to_block as usize
451 })
452 .next()
453 .map(|(i, _)| i)
454 .unwrap_or_default();
455
456 let mut pruned_changesets = changesets.iter().skip(pruned.saturating_sub(1));
459
460 let last_pruned_block_number = pruned_changesets
461 .next()
462 .map(|(block_number, _, _)| {
463 (if result.progress.is_finished() {
464 *block_number
465 } else {
466 block_number.saturating_sub(1)
467 }) as BlockNumber
468 })
469 .unwrap_or(to_block);
470
471 let pruned_changesets = pruned_changesets.fold(
472 BTreeMap::<_, Vec<_>>::new(),
473 |mut acc, (block_number, address, entry)| {
474 acc.entry((block_number, address)).or_default().push(entry);
475 acc
476 },
477 );
478
479 assert_eq!(
480 db.table::<tables::StorageChangeSets>().unwrap().len(),
481 pruned_changesets.values().flatten().count()
482 );
483
484 let actual_shards = db.table::<tables::StoragesHistory>().unwrap();
485
486 let expected_shards = original_shards
487 .iter()
488 .filter(|(key, _)| key.sharded_key.highest_block_number > last_pruned_block_number)
489 .map(|(key, blocks)| {
490 let new_blocks =
491 blocks.iter().skip_while(|block| *block <= last_pruned_block_number);
492 (key.clone(), BlockNumberList::new_pre_sorted(new_blocks))
493 })
494 .collect::<Vec<_>>();
495
496 assert_eq!(actual_shards, expected_shards);
497
498 assert_eq!(
499 db.factory
500 .provider()
501 .unwrap()
502 .get_prune_checkpoint(PruneSegment::StorageHistory)
503 .unwrap(),
504 Some(PruneCheckpoint {
505 block_number: Some(last_pruned_block_number),
506 tx_number: None,
507 prune_mode
508 })
509 );
510 };
511
512 test_prune(
513 998,
514 1,
515 (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 500),
516 );
517 test_prune(998, 2, (PruneProgress::Finished, 499));
518 test_prune(1200, 3, (PruneProgress::Finished, 202));
519 }
520
521 #[test]
525 #[cfg(not(all(unix, feature = "rocksdb")))]
526 fn prune_static_file() {
527 let db = TestStageDB::default();
528 let mut rng = generators::rng();
529
530 let blocks = random_block_range(
531 &mut rng,
532 0..=5000,
533 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
534 );
535 db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
536
537 let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
538
539 let (changesets, _) = random_changeset_range(
540 &mut rng,
541 blocks.iter(),
542 accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
543 1..2,
544 1..2,
545 );
546
547 db.insert_changesets_to_static_files(changesets.clone(), None)
548 .expect("insert changesets to static files");
549 db.insert_history(changesets.clone(), None).expect("insert history");
550
551 let storage_occurrences = db.table::<tables::StoragesHistory>().unwrap().into_iter().fold(
552 BTreeMap::<_, usize>::new(),
553 |mut map, (key, _)| {
554 map.entry((key.address, key.sharded_key.key)).or_default().add_assign(1);
555 map
556 },
557 );
558 assert!(storage_occurrences.into_iter().any(|(_, occurrences)| occurrences > 1));
559
560 let original_shards = db.table::<tables::StoragesHistory>().unwrap();
561
562 let test_prune = |to_block: BlockNumber,
563 run: usize,
564 expected_result: (PruneProgress, usize)| {
565 let prune_mode = PruneMode::Before(to_block);
566 let deleted_entries_limit = 1000;
567 let mut limiter =
568 PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
569 let input = PruneInput {
570 previous_checkpoint: db
571 .factory
572 .provider()
573 .unwrap()
574 .get_prune_checkpoint(PruneSegment::StorageHistory)
575 .unwrap(),
576 to_block,
577 limiter: limiter.clone(),
578 };
579 let segment = StorageHistory::new(prune_mode);
580
581 let provider = db.factory.database_provider_rw().unwrap();
582 provider.set_storage_settings_cache(StorageSettings::v2());
583 let result = segment.prune(&provider, input).unwrap();
584 limiter.increment_deleted_entries_count_by(result.pruned);
585
586 assert_matches!(
587 result,
588 SegmentOutput {progress, pruned, checkpoint: Some(_)}
589 if (progress, pruned) == expected_result
590 );
591
592 segment
593 .save_checkpoint(
594 &provider,
595 result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
596 )
597 .unwrap();
598 provider.commit().expect("commit");
599
600 let changesets = changesets
601 .iter()
602 .enumerate()
603 .flat_map(|(block_number, changeset)| {
604 changeset.iter().flat_map(move |(address, _, entries)| {
605 entries.iter().map(move |entry| (block_number, address, entry))
606 })
607 })
608 .collect::<Vec<_>>();
609
610 #[expect(clippy::skip_while_next)]
611 let pruned = changesets
612 .iter()
613 .enumerate()
614 .skip_while(|(i, (block_number, _, _))| {
615 *i < deleted_entries_limit / STORAGE_HISTORY_TABLES_TO_PRUNE * run &&
616 *block_number <= to_block as usize
617 })
618 .next()
619 .map(|(i, _)| i)
620 .unwrap_or_default();
621
622 let mut pruned_changesets = changesets.iter().skip(pruned.saturating_sub(1));
625
626 let last_pruned_block_number = pruned_changesets
627 .next()
628 .map(|(block_number, _, _)| {
629 (if result.progress.is_finished() {
630 *block_number
631 } else {
632 block_number.saturating_sub(1)
633 }) as BlockNumber
634 })
635 .unwrap_or(to_block);
636
637 let actual_shards = db.table::<tables::StoragesHistory>().unwrap();
638
639 let expected_shards = original_shards
640 .iter()
641 .filter(|(key, _)| key.sharded_key.highest_block_number > last_pruned_block_number)
642 .map(|(key, blocks)| {
643 let new_blocks =
644 blocks.iter().skip_while(|block| *block <= last_pruned_block_number);
645 (key.clone(), BlockNumberList::new_pre_sorted(new_blocks))
646 })
647 .collect::<Vec<_>>();
648
649 assert_eq!(actual_shards, expected_shards);
650
651 assert_eq!(
652 db.factory
653 .provider()
654 .unwrap()
655 .get_prune_checkpoint(PruneSegment::StorageHistory)
656 .unwrap(),
657 Some(PruneCheckpoint {
658 block_number: Some(last_pruned_block_number),
659 tx_number: None,
660 prune_mode
661 })
662 );
663 };
664
665 test_prune(
666 998,
667 1,
668 (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 500),
669 );
670 test_prune(998, 2, (PruneProgress::Finished, 500));
671 test_prune(1200, 3, (PruneProgress::Finished, 202));
672 }
673
674 #[test]
677 fn prune_partial_progress_mid_block() {
678 use alloy_primitives::{Address, U256};
679 use reth_primitives_traits::Account;
680 use reth_testing_utils::generators::ChangeSet;
681
682 let db = TestStageDB::default();
683 let mut rng = generators::rng();
684
685 let blocks = random_block_range(
687 &mut rng,
688 0..=10,
689 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
690 );
691 db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
692
693 let addr1 = Address::with_last_byte(1);
695 let addr2 = Address::with_last_byte(2);
696
697 let account = Account { nonce: 1, balance: U256::from(100), bytecode_hash: None };
698
699 let storage_entry = |key: u8| reth_primitives_traits::StorageEntry {
701 key: B256::with_last_byte(key),
702 value: U256::from(100),
703 };
704
705 let changesets: Vec<ChangeSet> = vec![
708 vec![(addr1, account, vec![storage_entry(1)])], vec![(addr1, account, vec![storage_entry(1)])], vec![(addr1, account, vec![storage_entry(1)])], vec![(addr1, account, vec![storage_entry(1)])], vec![(addr1, account, vec![storage_entry(1)])], vec![
716 (addr1, account, vec![storage_entry(1), storage_entry(2)]),
717 (addr2, account, vec![storage_entry(1), storage_entry(2)]),
718 ],
719 vec![(addr1, account, vec![storage_entry(3)])], ];
721
722 db.insert_changesets(changesets.clone(), None).expect("insert changesets");
723 db.insert_history(changesets.clone(), None).expect("insert history");
724
725 let total_storage_entries: usize =
727 changesets.iter().flat_map(|c| c.iter()).map(|(_, _, entries)| entries.len()).sum();
728 assert_eq!(db.table::<tables::StorageChangeSets>().unwrap().len(), total_storage_entries);
729
730 let prune_mode = PruneMode::Before(10);
731
732 let deleted_entries_limit = 14; let limiter = PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
737
738 let input = PruneInput { previous_checkpoint: None, to_block: 10, limiter };
739 let segment = StorageHistory::new(prune_mode);
740
741 let provider = db.factory.database_provider_rw().unwrap();
742 provider.set_storage_settings_cache(StorageSettings::v1());
743 let result = segment.prune(&provider, input).unwrap();
744
745 assert!(!result.progress.is_finished(), "Expected HasMoreData since we stopped mid-block");
747
748 segment
750 .save_checkpoint(&provider, result.checkpoint.unwrap().as_prune_checkpoint(prune_mode))
751 .unwrap();
752 provider.commit().expect("commit");
753
754 let checkpoint = db
756 .factory
757 .provider()
758 .unwrap()
759 .get_prune_checkpoint(PruneSegment::StorageHistory)
760 .unwrap()
761 .expect("checkpoint should exist");
762
763 assert_eq!(
764 checkpoint.block_number,
765 Some(4),
766 "Checkpoint should be block 4 (block before incomplete block 5)"
767 );
768
769 let remaining_changesets = db.table::<tables::StorageChangeSets>().unwrap();
771 assert!(
772 !remaining_changesets.is_empty(),
773 "Should have remaining changesets for blocks 5-6"
774 );
775
776 let history = db.table::<tables::StoragesHistory>().unwrap();
778 for (key, _blocks) in &history {
779 assert!(
780 key.sharded_key.highest_block_number > 4,
781 "Found stale history shard with highest_block_number {} <= checkpoint 4",
782 key.sharded_key.highest_block_number
783 );
784 }
785
786 let input2 = PruneInput {
788 previous_checkpoint: Some(checkpoint),
789 to_block: 10,
790 limiter: PruneLimiter::default().set_deleted_entries_limit(100), };
792
793 let provider2 = db.factory.database_provider_rw().unwrap();
794 provider2.set_storage_settings_cache(StorageSettings::v1());
795 let result2 = segment.prune(&provider2, input2).unwrap();
796
797 assert!(result2.progress.is_finished(), "Second run should complete");
798
799 segment
800 .save_checkpoint(
801 &provider2,
802 result2.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
803 )
804 .unwrap();
805 provider2.commit().expect("commit");
806
807 let final_checkpoint = db
809 .factory
810 .provider()
811 .unwrap()
812 .get_prune_checkpoint(PruneSegment::StorageHistory)
813 .unwrap()
814 .expect("checkpoint should exist");
815
816 assert_eq!(final_checkpoint.block_number, Some(6), "Final checkpoint should be at block 6");
818
819 let final_changesets = db.table::<tables::StorageChangeSets>().unwrap();
821 assert!(final_changesets.is_empty(), "All changesets up to block 10 should be pruned");
822 }
823
824 #[cfg(all(unix, feature = "rocksdb"))]
825 #[test]
826 fn prune_rocksdb() {
827 use reth_db_api::models::storage_sharded_key::StorageShardedKey;
828 use reth_provider::RocksDBProviderFactory;
829 use reth_storage_api::StorageSettings;
830
831 let db = TestStageDB::default();
832 let mut rng = generators::rng();
833
834 let blocks = random_block_range(
835 &mut rng,
836 0..=100,
837 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
838 );
839 db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
840
841 let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
842
843 let (changesets, _) = random_changeset_range(
844 &mut rng,
845 blocks.iter(),
846 accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
847 1..2,
848 1..2,
849 );
850
851 db.insert_changesets_to_static_files(changesets.clone(), None)
852 .expect("insert changesets to static files");
853
854 let mut storage_indices: BTreeMap<(alloy_primitives::Address, B256), Vec<u64>> =
855 BTreeMap::new();
856 for (block, changeset) in changesets.iter().enumerate() {
857 for (address, _, storage_entries) in changeset {
858 for entry in storage_entries {
859 storage_indices.entry((*address, entry.key)).or_default().push(block as u64);
860 }
861 }
862 }
863
864 {
865 let rocksdb = db.factory.rocksdb_provider();
866 let mut batch = rocksdb.batch();
867 for ((address, storage_key), block_numbers) in &storage_indices {
868 let shard = BlockNumberList::new_pre_sorted(block_numbers.clone());
869 batch
870 .put::<tables::StoragesHistory>(
871 StorageShardedKey::last(*address, *storage_key),
872 &shard,
873 )
874 .expect("insert storage history shard");
875 }
876 batch.commit().expect("commit rocksdb batch");
877 }
878
879 {
880 let rocksdb = db.factory.rocksdb_provider();
881 for (address, storage_key) in storage_indices.keys() {
882 let shards = rocksdb.storage_history_shards(*address, *storage_key).unwrap();
883 assert!(!shards.is_empty(), "RocksDB should contain storage history before prune");
884 }
885 }
886
887 let to_block = 50u64;
888 let prune_mode = PruneMode::Before(to_block);
889 let input =
890 PruneInput { previous_checkpoint: None, to_block, limiter: PruneLimiter::default() };
891 let segment = StorageHistory::new(prune_mode);
892
893 let provider = db.factory.database_provider_rw().unwrap();
894 provider.set_storage_settings_cache(StorageSettings::v2());
895 let result = segment.prune(&provider, input).unwrap();
896 provider.commit().expect("commit");
897
898 assert_matches!(
899 result,
900 SegmentOutput { progress: PruneProgress::Finished, checkpoint: Some(_), .. }
901 );
902
903 {
904 let rocksdb = db.factory.rocksdb_provider();
905 for ((address, storage_key), block_numbers) in &storage_indices {
906 let shards = rocksdb.storage_history_shards(*address, *storage_key).unwrap();
907
908 let remaining_blocks: Vec<u64> =
909 block_numbers.iter().copied().filter(|&b| b > to_block).collect();
910
911 if remaining_blocks.is_empty() {
912 assert!(
913 shards.is_empty(),
914 "Shard for {:?}/{:?} should be deleted when all blocks pruned",
915 address,
916 storage_key
917 );
918 } else {
919 assert!(!shards.is_empty(), "Shard should exist with remaining blocks");
920 let actual_blocks: Vec<u64> =
921 shards.iter().flat_map(|(_, list)| list.iter()).collect();
922 assert_eq!(
923 actual_blocks, remaining_blocks,
924 "RocksDB shard should only contain blocks > {}",
925 to_block
926 );
927 }
928 }
929 }
930 }
931}