1use crate::{
2 db_ext::DbTxPruneExt,
3 segments::{
4 user::history::{finalize_history_prune, HistoryPruneResult},
5 PruneInput, Segment,
6 },
7 PrunerError,
8};
9use alloy_primitives::{Address, BlockNumber, B256};
10use reth_db_api::{
11 models::{storage_sharded_key::StorageShardedKey, BlockNumberAddress},
12 tables,
13 transaction::DbTxMut,
14};
15use reth_provider::{DBProvider, EitherWriter, RocksDBProviderFactory, StaticFileProviderFactory};
16use reth_prune_types::{
17 PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
18};
19use reth_static_file_types::StaticFileSegment;
20use reth_storage_api::{StorageChangeSetReader, StorageSettingsCache};
21use rustc_hash::FxHashMap;
22use tracing::{instrument, trace};
23
24const STORAGE_HISTORY_TABLES_TO_PRUNE: usize = 2;
29
30#[derive(Debug)]
31pub struct StorageHistory {
32 mode: PruneMode,
33}
34
35impl StorageHistory {
36 pub const fn new(mode: PruneMode) -> Self {
37 Self { mode }
38 }
39}
40
41impl<Provider> Segment<Provider> for StorageHistory
42where
43 Provider: DBProvider<Tx: DbTxMut>
44 + StaticFileProviderFactory
45 + StorageChangeSetReader
46 + StorageSettingsCache
47 + RocksDBProviderFactory,
48{
49 fn segment(&self) -> PruneSegment {
50 PruneSegment::StorageHistory
51 }
52
53 fn mode(&self) -> Option<PruneMode> {
54 Some(self.mode)
55 }
56
57 fn purpose(&self) -> PrunePurpose {
58 PrunePurpose::User
59 }
60
61 #[instrument(
62 name = "StorageHistory::prune",
63 target = "pruner",
64 skip(self, provider),
65 ret(level = "trace")
66 )]
67 fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
68 let range = match input.get_next_block_range() {
69 Some(range) => range,
70 None => {
71 trace!(target: "pruner", "No storage history to prune");
72 return Ok(SegmentOutput::done())
73 }
74 };
75 let range_end = *range.end();
76
77 if provider.cached_storage_settings().storage_v2 {
79 return self.prune_rocksdb(provider, input, range, range_end);
80 }
81
82 if EitherWriter::storage_changesets_destination(provider).is_static_file() {
84 self.prune_static_files(provider, input, range, range_end)
85 } else {
86 self.prune_database(provider, input, range, range_end)
87 }
88 }
89}
90
91impl StorageHistory {
92 fn prune_static_files<Provider>(
94 &self,
95 provider: &Provider,
96 input: PruneInput,
97 range: std::ops::RangeInclusive<BlockNumber>,
98 range_end: BlockNumber,
99 ) -> Result<SegmentOutput, PrunerError>
100 where
101 Provider: DBProvider<Tx: DbTxMut> + StaticFileProviderFactory,
102 {
103 let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() {
104 input.limiter.set_deleted_entries_limit(limit / STORAGE_HISTORY_TABLES_TO_PRUNE)
105 } else {
106 input.limiter
107 };
108
109 if limiter.is_limit_reached() {
112 return Ok(SegmentOutput::not_done(
113 limiter.interrupt_reason(),
114 input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
115 ))
116 }
117
118 let mut highest_deleted_storages = FxHashMap::default();
124 let mut last_changeset_pruned_block = None;
125 let mut pruned_changesets = 0;
126 let mut done = true;
127
128 let walker = provider.static_file_provider().walk_storage_changeset_range(range);
129 for result in walker {
130 if limiter.is_limit_reached() {
131 done = false;
132 break;
133 }
134 let (block_address, entry) = result?;
135 let block_number = block_address.block_number();
136 let address = block_address.address();
137 highest_deleted_storages.insert((address, entry.key), block_number);
138 last_changeset_pruned_block = Some(block_number);
139 pruned_changesets += 1;
140 limiter.increment_deleted_entries_count();
141 }
142
143 if done && let Some(last_block) = last_changeset_pruned_block {
145 provider
146 .static_file_provider()
147 .delete_segment_below_block(StaticFileSegment::StorageChangeSets, last_block + 1)?;
148 }
149 trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned storage history (changesets from static files)");
150
151 let result = HistoryPruneResult {
152 highest_deleted: highest_deleted_storages,
153 last_pruned_block: last_changeset_pruned_block,
154 pruned_count: pruned_changesets,
155 done,
156 };
157 finalize_history_prune::<_, tables::StoragesHistory, (Address, B256), _>(
158 provider,
159 result,
160 range_end,
161 &limiter,
162 |(address, storage_key), block_number| {
163 StorageShardedKey::new(address, storage_key, block_number)
164 },
165 |a, b| a.address == b.address && a.sharded_key.key == b.sharded_key.key,
166 )
167 .map_err(Into::into)
168 }
169
170 fn prune_database<Provider>(
171 &self,
172 provider: &Provider,
173 input: PruneInput,
174 range: std::ops::RangeInclusive<BlockNumber>,
175 range_end: BlockNumber,
176 ) -> Result<SegmentOutput, PrunerError>
177 where
178 Provider: DBProvider<Tx: DbTxMut>,
179 {
180 let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() {
181 input.limiter.set_deleted_entries_limit(limit / STORAGE_HISTORY_TABLES_TO_PRUNE)
182 } else {
183 input.limiter
184 };
185
186 if limiter.is_limit_reached() {
187 return Ok(SegmentOutput::not_done(
188 limiter.interrupt_reason(),
189 input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
190 ))
191 }
192
193 let mut last_changeset_pruned_block = None;
202 let mut highest_deleted_storages = FxHashMap::default();
203 let (pruned_changesets, done) =
204 provider.tx_ref().prune_table_with_range::<tables::StorageChangeSets>(
205 BlockNumberAddress::range(range),
206 &mut limiter,
207 |_| false,
208 |(BlockNumberAddress((block_number, address)), entry)| {
209 highest_deleted_storages.insert((address, entry.key), block_number);
210 last_changeset_pruned_block = Some(block_number);
211 },
212 )?;
213 trace!(target: "pruner", deleted = %pruned_changesets, %done, "Pruned storage history (changesets)");
214
215 let result = HistoryPruneResult {
216 highest_deleted: highest_deleted_storages,
217 last_pruned_block: last_changeset_pruned_block,
218 pruned_count: pruned_changesets,
219 done,
220 };
221 finalize_history_prune::<_, tables::StoragesHistory, (Address, B256), _>(
222 provider,
223 result,
224 range_end,
225 &limiter,
226 |(address, storage_key), block_number| {
227 StorageShardedKey::new(address, storage_key, block_number)
228 },
229 |a, b| a.address == b.address && a.sharded_key.key == b.sharded_key.key,
230 )
231 .map_err(Into::into)
232 }
233
234 fn prune_rocksdb<Provider>(
239 &self,
240 provider: &Provider,
241 input: PruneInput,
242 range: std::ops::RangeInclusive<BlockNumber>,
243 range_end: BlockNumber,
244 ) -> Result<SegmentOutput, PrunerError>
245 where
246 Provider: DBProvider + StaticFileProviderFactory + RocksDBProviderFactory,
247 {
248 let mut limiter = input.limiter;
249
250 if limiter.is_limit_reached() {
251 return Ok(SegmentOutput::not_done(
252 limiter.interrupt_reason(),
253 input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
254 ))
255 }
256
257 let mut highest_deleted_storages: FxHashMap<_, _> = FxHashMap::default();
258 let mut last_changeset_pruned_block = None;
259 let mut changesets_processed = 0usize;
260 let mut done = true;
261
262 let walker = provider.static_file_provider().walk_storage_changeset_range(range);
266 for result in walker {
267 if limiter.is_limit_reached() {
268 done = false;
269 break;
270 }
271 let (block_address, entry) = result?;
272 let block_number = block_address.block_number();
273 let address = block_address.address();
274 highest_deleted_storages.insert((address, entry.key), block_number);
275 last_changeset_pruned_block = Some(block_number);
276 changesets_processed += 1;
277 limiter.increment_deleted_entries_count();
278 }
279
280 trace!(target: "pruner", processed = %changesets_processed, %done, "Scanned storage changesets from static files");
281
282 let last_changeset_pruned_block = last_changeset_pruned_block
283 .map(|block_number| if done { block_number } else { block_number.saturating_sub(1) })
284 .unwrap_or(range_end);
285
286 let mut deleted_shards = 0usize;
288 let mut updated_shards = 0usize;
289
290 let mut sorted_storages: Vec<_> = highest_deleted_storages.into_iter().collect();
292 sorted_storages.sort_unstable_by_key(|((addr, key), _)| (*addr, *key));
293
294 provider.with_rocksdb_batch(|mut batch| {
295 let targets: Vec<_> = sorted_storages
296 .iter()
297 .map(|((addr, key), highest)| {
298 ((*addr, *key), (*highest).min(last_changeset_pruned_block))
299 })
300 .collect();
301
302 let outcomes = batch.prune_storage_history_batch(&targets)?;
303 deleted_shards = outcomes.deleted;
304 updated_shards = outcomes.updated;
305
306 Ok(((), Some(batch.into_inner())))
307 })?;
308
309 trace!(target: "pruner", deleted = deleted_shards, updated = updated_shards, %done, "Pruned storage history (RocksDB indices)");
310
311 if done {
317 provider.static_file_provider().delete_segment_below_block(
318 StaticFileSegment::StorageChangeSets,
319 last_changeset_pruned_block + 1,
320 )?;
321 }
322
323 let progress = limiter.progress(done);
324
325 Ok(SegmentOutput {
326 progress,
327 pruned: changesets_processed + deleted_shards + updated_shards,
328 checkpoint: Some(SegmentOutputCheckpoint {
329 block_number: Some(last_changeset_pruned_block),
330 tx_number: None,
331 }),
332 })
333 }
334}
335
336#[cfg(test)]
337mod tests {
338 use super::STORAGE_HISTORY_TABLES_TO_PRUNE;
339 use crate::segments::{PruneInput, PruneLimiter, Segment, SegmentOutput, StorageHistory};
340 use alloy_primitives::{BlockNumber, B256};
341 use assert_matches::assert_matches;
342 use reth_db_api::{models::StorageSettings, tables, BlockNumberList};
343 use reth_provider::{DBProvider, DatabaseProviderFactory, PruneCheckpointReader};
344 use reth_prune_types::{
345 PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
346 };
347 use reth_stages::test_utils::{StorageKind, TestStageDB};
348 use reth_storage_api::StorageSettingsCache;
349 use reth_testing_utils::generators::{
350 self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
351 };
352 use std::{collections::BTreeMap, ops::AddAssign};
353
354 #[test]
355 fn prune_legacy() {
356 let db = TestStageDB::default();
357 let mut rng = generators::rng();
358
359 let blocks = random_block_range(
360 &mut rng,
361 0..=5000,
362 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
363 );
364 db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
365
366 let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
367
368 let (changesets, _) = random_changeset_range(
369 &mut rng,
370 blocks.iter(),
371 accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
372 1..2,
373 1..2,
374 );
375 db.insert_changesets(changesets.clone(), None).expect("insert changesets");
376 db.insert_history(changesets.clone(), None).expect("insert history");
377
378 let storage_occurrences = db.table::<tables::StoragesHistory>().unwrap().into_iter().fold(
379 BTreeMap::<_, usize>::new(),
380 |mut map, (key, _)| {
381 map.entry((key.address, key.sharded_key.key)).or_default().add_assign(1);
382 map
383 },
384 );
385 assert!(storage_occurrences.into_iter().any(|(_, occurrences)| occurrences > 1));
386
387 assert_eq!(
388 db.table::<tables::StorageChangeSets>().unwrap().len(),
389 changesets.iter().flatten().flat_map(|(_, _, entries)| entries).count()
390 );
391
392 let original_shards = db.table::<tables::StoragesHistory>().unwrap();
393
394 let test_prune = |to_block: BlockNumber,
395 run: usize,
396 expected_result: (PruneProgress, usize)| {
397 let prune_mode = PruneMode::Before(to_block);
398 let deleted_entries_limit = 1000;
399 let mut limiter =
400 PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
401 let input = PruneInput {
402 previous_checkpoint: db
403 .factory
404 .provider()
405 .unwrap()
406 .get_prune_checkpoint(PruneSegment::StorageHistory)
407 .unwrap(),
408 to_block,
409 limiter: limiter.clone(),
410 };
411 let segment = StorageHistory::new(prune_mode);
412
413 let provider = db.factory.database_provider_rw().unwrap();
414 provider.set_storage_settings_cache(StorageSettings::v1());
415 let result = segment.prune(&provider, input).unwrap();
416 limiter.increment_deleted_entries_count_by(result.pruned);
417
418 assert_matches!(
419 result,
420 SegmentOutput {progress, pruned, checkpoint: Some(_)}
421 if (progress, pruned) == expected_result
422 );
423
424 segment
425 .save_checkpoint(
426 &provider,
427 result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
428 )
429 .unwrap();
430 provider.commit().expect("commit");
431
432 let changesets = changesets
433 .iter()
434 .enumerate()
435 .flat_map(|(block_number, changeset)| {
436 changeset.iter().flat_map(move |(address, _, entries)| {
437 entries.iter().map(move |entry| (block_number, address, entry))
438 })
439 })
440 .collect::<Vec<_>>();
441
442 #[expect(clippy::skip_while_next)]
443 let pruned = changesets
444 .iter()
445 .enumerate()
446 .skip_while(|(i, (block_number, _, _))| {
447 *i < deleted_entries_limit / STORAGE_HISTORY_TABLES_TO_PRUNE * run &&
448 *block_number <= to_block as usize
449 })
450 .next()
451 .map(|(i, _)| i)
452 .unwrap_or_default();
453
454 let mut pruned_changesets = changesets.iter().skip(pruned.saturating_sub(1));
457
458 let last_pruned_block_number = pruned_changesets
459 .next()
460 .map(|(block_number, _, _)| {
461 (if result.progress.is_finished() {
462 *block_number
463 } else {
464 block_number.saturating_sub(1)
465 }) as BlockNumber
466 })
467 .unwrap_or(to_block);
468
469 let pruned_changesets = pruned_changesets.fold(
470 BTreeMap::<_, Vec<_>>::new(),
471 |mut acc, (block_number, address, entry)| {
472 acc.entry((block_number, address)).or_default().push(entry);
473 acc
474 },
475 );
476
477 assert_eq!(
478 db.table::<tables::StorageChangeSets>().unwrap().len(),
479 pruned_changesets.values().flatten().count()
480 );
481
482 let actual_shards = db.table::<tables::StoragesHistory>().unwrap();
483
484 let expected_shards = original_shards
485 .iter()
486 .filter(|(key, _)| key.sharded_key.highest_block_number > last_pruned_block_number)
487 .map(|(key, blocks)| {
488 let new_blocks =
489 blocks.iter().skip_while(|block| *block <= last_pruned_block_number);
490 (key.clone(), BlockNumberList::new_pre_sorted(new_blocks))
491 })
492 .collect::<Vec<_>>();
493
494 assert_eq!(actual_shards, expected_shards);
495
496 assert_eq!(
497 db.factory
498 .provider()
499 .unwrap()
500 .get_prune_checkpoint(PruneSegment::StorageHistory)
501 .unwrap(),
502 Some(PruneCheckpoint {
503 block_number: Some(last_pruned_block_number),
504 tx_number: None,
505 prune_mode
506 })
507 );
508 };
509
510 test_prune(
511 998,
512 1,
513 (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 500),
514 );
515 test_prune(998, 2, (PruneProgress::Finished, 499));
516 test_prune(1200, 3, (PruneProgress::Finished, 202));
517 }
518
519 #[test]
522 fn prune_partial_progress_mid_block() {
523 use alloy_primitives::{Address, U256};
524 use reth_primitives_traits::Account;
525 use reth_testing_utils::generators::ChangeSet;
526
527 let db = TestStageDB::default();
528 let mut rng = generators::rng();
529
530 let blocks = random_block_range(
532 &mut rng,
533 0..=10,
534 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
535 );
536 db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
537
538 let addr1 = Address::with_last_byte(1);
540 let addr2 = Address::with_last_byte(2);
541
542 let account = Account { nonce: 1, balance: U256::from(100), bytecode_hash: None };
543
544 let storage_entry = |key: u8| reth_primitives_traits::StorageEntry {
546 key: B256::with_last_byte(key),
547 value: U256::from(100),
548 };
549
550 let changesets: Vec<ChangeSet> = vec![
553 vec![(addr1, account, vec![storage_entry(1)])], vec![(addr1, account, vec![storage_entry(1)])], vec![(addr1, account, vec![storage_entry(1)])], vec![(addr1, account, vec![storage_entry(1)])], vec![(addr1, account, vec![storage_entry(1)])], vec![
561 (addr1, account, vec![storage_entry(1), storage_entry(2)]),
562 (addr2, account, vec![storage_entry(1), storage_entry(2)]),
563 ],
564 vec![(addr1, account, vec![storage_entry(3)])], ];
566
567 db.insert_changesets(changesets.clone(), None).expect("insert changesets");
568 db.insert_history(changesets.clone(), None).expect("insert history");
569
570 let total_storage_entries: usize =
572 changesets.iter().flat_map(|c| c.iter()).map(|(_, _, entries)| entries.len()).sum();
573 assert_eq!(db.table::<tables::StorageChangeSets>().unwrap().len(), total_storage_entries);
574
575 let prune_mode = PruneMode::Before(10);
576
577 let deleted_entries_limit = 14; let limiter = PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
582
583 let input = PruneInput { previous_checkpoint: None, to_block: 10, limiter };
584 let segment = StorageHistory::new(prune_mode);
585
586 let provider = db.factory.database_provider_rw().unwrap();
587 provider.set_storage_settings_cache(StorageSettings::v1());
588 let result = segment.prune(&provider, input).unwrap();
589
590 assert!(!result.progress.is_finished(), "Expected HasMoreData since we stopped mid-block");
592
593 segment
595 .save_checkpoint(&provider, result.checkpoint.unwrap().as_prune_checkpoint(prune_mode))
596 .unwrap();
597 provider.commit().expect("commit");
598
599 let checkpoint = db
601 .factory
602 .provider()
603 .unwrap()
604 .get_prune_checkpoint(PruneSegment::StorageHistory)
605 .unwrap()
606 .expect("checkpoint should exist");
607
608 assert_eq!(
609 checkpoint.block_number,
610 Some(4),
611 "Checkpoint should be block 4 (block before incomplete block 5)"
612 );
613
614 let remaining_changesets = db.table::<tables::StorageChangeSets>().unwrap();
616 assert!(
617 !remaining_changesets.is_empty(),
618 "Should have remaining changesets for blocks 5-6"
619 );
620
621 let history = db.table::<tables::StoragesHistory>().unwrap();
623 for (key, _blocks) in &history {
624 assert!(
625 key.sharded_key.highest_block_number > 4,
626 "Found stale history shard with highest_block_number {} <= checkpoint 4",
627 key.sharded_key.highest_block_number
628 );
629 }
630
631 let input2 = PruneInput {
633 previous_checkpoint: Some(checkpoint),
634 to_block: 10,
635 limiter: PruneLimiter::default().set_deleted_entries_limit(100), };
637
638 let provider2 = db.factory.database_provider_rw().unwrap();
639 provider2.set_storage_settings_cache(StorageSettings::v1());
640 let result2 = segment.prune(&provider2, input2).unwrap();
641
642 assert!(result2.progress.is_finished(), "Second run should complete");
643
644 segment
645 .save_checkpoint(
646 &provider2,
647 result2.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
648 )
649 .unwrap();
650 provider2.commit().expect("commit");
651
652 let final_checkpoint = db
654 .factory
655 .provider()
656 .unwrap()
657 .get_prune_checkpoint(PruneSegment::StorageHistory)
658 .unwrap()
659 .expect("checkpoint should exist");
660
661 assert_eq!(final_checkpoint.block_number, Some(6), "Final checkpoint should be at block 6");
663
664 let final_changesets = db.table::<tables::StorageChangeSets>().unwrap();
666 assert!(final_changesets.is_empty(), "All changesets up to block 10 should be pruned");
667 }
668
669 #[test]
670 fn prune_rocksdb() {
671 use reth_db_api::models::storage_sharded_key::StorageShardedKey;
672 use reth_provider::RocksDBProviderFactory;
673 use reth_storage_api::StorageSettings;
674
675 let db = TestStageDB::default();
676 let mut rng = generators::rng();
677
678 let blocks = random_block_range(
679 &mut rng,
680 0..=100,
681 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
682 );
683 db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
684
685 let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
686
687 let (changesets, _) = random_changeset_range(
688 &mut rng,
689 blocks.iter(),
690 accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
691 1..2,
692 1..2,
693 );
694
695 db.insert_changesets_to_static_files(changesets.clone(), None)
696 .expect("insert changesets to static files");
697
698 let mut storage_indices: BTreeMap<(alloy_primitives::Address, B256), Vec<u64>> =
699 BTreeMap::new();
700 for (block, changeset) in changesets.iter().enumerate() {
701 for (address, _, storage_entries) in changeset {
702 for entry in storage_entries {
703 storage_indices.entry((*address, entry.key)).or_default().push(block as u64);
704 }
705 }
706 }
707
708 {
709 let rocksdb = db.factory.rocksdb_provider();
710 let mut batch = rocksdb.batch();
711 for ((address, storage_key), block_numbers) in &storage_indices {
712 let shard = BlockNumberList::new_pre_sorted(block_numbers.clone());
713 batch
714 .put::<tables::StoragesHistory>(
715 StorageShardedKey::last(*address, *storage_key),
716 &shard,
717 )
718 .expect("insert storage history shard");
719 }
720 batch.commit().expect("commit rocksdb batch");
721 }
722
723 {
724 let rocksdb = db.factory.rocksdb_provider();
725 for (address, storage_key) in storage_indices.keys() {
726 let shards = rocksdb.storage_history_shards(*address, *storage_key).unwrap();
727 assert!(!shards.is_empty(), "RocksDB should contain storage history before prune");
728 }
729 }
730
731 let to_block = 50u64;
732 let prune_mode = PruneMode::Before(to_block);
733 let input =
734 PruneInput { previous_checkpoint: None, to_block, limiter: PruneLimiter::default() };
735 let segment = StorageHistory::new(prune_mode);
736
737 let provider = db.factory.database_provider_rw().unwrap();
738 provider.set_storage_settings_cache(StorageSettings::v2());
739 let result = segment.prune(&provider, input).unwrap();
740 provider.commit().expect("commit");
741
742 assert_matches!(
743 result,
744 SegmentOutput { progress: PruneProgress::Finished, checkpoint: Some(_), .. }
745 );
746
747 {
748 let rocksdb = db.factory.rocksdb_provider();
749 for ((address, storage_key), block_numbers) in &storage_indices {
750 let shards = rocksdb.storage_history_shards(*address, *storage_key).unwrap();
751
752 let remaining_blocks: Vec<u64> =
753 block_numbers.iter().copied().filter(|&b| b > to_block).collect();
754
755 if remaining_blocks.is_empty() {
756 assert!(
757 shards.is_empty(),
758 "Shard for {:?}/{:?} should be deleted when all blocks pruned",
759 address,
760 storage_key
761 );
762 } else {
763 assert!(!shards.is_empty(), "Shard should exist with remaining blocks");
764 let actual_blocks: Vec<u64> =
765 shards.iter().flat_map(|(_, list)| list.iter()).collect();
766 assert_eq!(
767 actual_blocks, remaining_blocks,
768 "RocksDB shard should only contain blocks > {}",
769 to_block
770 );
771 }
772 }
773 }
774 }
775}