Skip to main content

reth_prune/segments/user/
storage_history.rs

1use crate::{
2    db_ext::DbTxPruneExt,
3    segments::{
4        user::history::{finalize_history_prune, HistoryPruneResult},
5        PruneInput, Segment,
6    },
7    PrunerError,
8};
9use alloy_primitives::{Address, BlockNumber, B256};
10use reth_db_api::{
11    models::{storage_sharded_key::StorageShardedKey, BlockNumberAddress},
12    tables,
13    transaction::DbTxMut,
14};
15use reth_provider::{DBProvider, EitherWriter, RocksDBProviderFactory, StaticFileProviderFactory};
16use reth_prune_types::{
17    PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
18};
19use reth_static_file_types::StaticFileSegment;
20use reth_storage_api::{StorageChangeSetReader, StorageSettingsCache};
21use rustc_hash::FxHashMap;
22use tracing::{instrument, trace};
23
24/// Number of storage history tables to prune in one step.
25///
26/// Storage History consists of two tables: [`tables::StorageChangeSets`] and
27/// [`tables::StoragesHistory`]. We want to prune them to the same block number.
28const STORAGE_HISTORY_TABLES_TO_PRUNE: usize = 2;
29
30#[derive(Debug)]
31pub struct StorageHistory {
32    mode: PruneMode,
33}
34
35impl StorageHistory {
36    pub const fn new(mode: PruneMode) -> Self {
37        Self { mode }
38    }
39}
40
41impl<Provider> Segment<Provider> for StorageHistory
42where
43    Provider: DBProvider<Tx: DbTxMut>
44        + StaticFileProviderFactory
45        + StorageChangeSetReader
46        + StorageSettingsCache
47        + RocksDBProviderFactory,
48{
49    fn segment(&self) -> PruneSegment {
50        PruneSegment::StorageHistory
51    }
52
53    fn mode(&self) -> Option<PruneMode> {
54        Some(self.mode)
55    }
56
57    fn purpose(&self) -> PrunePurpose {
58        PrunePurpose::User
59    }
60
61    #[instrument(
62        name = "StorageHistory::prune",
63        target = "pruner",
64        skip(self, provider),
65        ret(level = "trace")
66    )]
67    fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
68        let range = match input.get_next_block_range() {
69            Some(range) => range,
70            None => {
71                trace!(target: "pruner", "No storage history to prune");
72                return Ok(SegmentOutput::done())
73            }
74        };
75        let range_end = *range.end();
76
77        // Check where storage history indices are stored
78        if provider.cached_storage_settings().storage_v2 {
79            return self.prune_rocksdb(provider, input, range, range_end);
80        }
81
82        // Check where storage changesets are stored (MDBX path)
83        if EitherWriter::storage_changesets_destination(provider).is_static_file() {
84            self.prune_static_files(provider, input, range, range_end)
85        } else {
86            self.prune_database(provider, input, range, range_end)
87        }
88    }
89}
90
91impl StorageHistory {
92    /// Prunes storage history when changesets are stored in static files.
93    fn prune_static_files<Provider>(
94        &self,
95        provider: &Provider,
96        input: PruneInput,
97        range: std::ops::RangeInclusive<BlockNumber>,
98        range_end: BlockNumber,
99    ) -> Result<SegmentOutput, PrunerError>
100    where
101        Provider: DBProvider<Tx: DbTxMut> + StaticFileProviderFactory,
102    {
103        let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() {
104            input.limiter.set_deleted_entries_limit(limit / STORAGE_HISTORY_TABLES_TO_PRUNE)
105        } else {
106            input.limiter
107        };
108
109        // The limiter may already be exhausted from a previous segment in the same prune run.
110        // Early exit avoids unnecessary iteration when no budget remains.
111        if limiter.is_limit_reached() {
112            return Ok(SegmentOutput::not_done(
113                limiter.interrupt_reason(),
114                input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
115            ))
116        }
117
118        // The size of this map is limited by `prune_delete_limit * blocks_since_last_run /
119        // STORAGE_HISTORY_TABLES_TO_PRUNE`, and with current defaults it's usually `3500 * 5
120        // / 2`, so 8750 entries. Each entry is `160 bit + 256 bit + 64 bit`, so the total
121        // size should be up to ~0.5MB + some hashmap overhead. `blocks_since_last_run` is
122        // additionally limited by the `max_reorg_depth`, so no OOM is expected here.
123        let mut highest_deleted_storages = FxHashMap::default();
124        let mut last_changeset_pruned_block = None;
125        let mut pruned_changesets = 0;
126        let mut done = true;
127
128        let walker = provider.static_file_provider().walk_storage_changeset_range(range);
129        for result in walker {
130            if limiter.is_limit_reached() {
131                done = false;
132                break;
133            }
134            let (block_address, entry) = result?;
135            let block_number = block_address.block_number();
136            let address = block_address.address();
137            highest_deleted_storages.insert((address, entry.key), block_number);
138            last_changeset_pruned_block = Some(block_number);
139            pruned_changesets += 1;
140            limiter.increment_deleted_entries_count();
141        }
142
143        // Delete static file jars only when fully processed
144        if done && let Some(last_block) = last_changeset_pruned_block {
145            provider
146                .static_file_provider()
147                .delete_segment_below_block(StaticFileSegment::StorageChangeSets, last_block + 1)?;
148        }
149        trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned storage history (changesets from static files)");
150
151        let result = HistoryPruneResult {
152            highest_deleted: highest_deleted_storages,
153            last_pruned_block: last_changeset_pruned_block,
154            pruned_count: pruned_changesets,
155            done,
156        };
157        finalize_history_prune::<_, tables::StoragesHistory, (Address, B256), _>(
158            provider,
159            result,
160            range_end,
161            &limiter,
162            |(address, storage_key), block_number| {
163                StorageShardedKey::new(address, storage_key, block_number)
164            },
165            |a, b| a.address == b.address && a.sharded_key.key == b.sharded_key.key,
166        )
167        .map_err(Into::into)
168    }
169
170    fn prune_database<Provider>(
171        &self,
172        provider: &Provider,
173        input: PruneInput,
174        range: std::ops::RangeInclusive<BlockNumber>,
175        range_end: BlockNumber,
176    ) -> Result<SegmentOutput, PrunerError>
177    where
178        Provider: DBProvider<Tx: DbTxMut>,
179    {
180        let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() {
181            input.limiter.set_deleted_entries_limit(limit / STORAGE_HISTORY_TABLES_TO_PRUNE)
182        } else {
183            input.limiter
184        };
185
186        if limiter.is_limit_reached() {
187            return Ok(SegmentOutput::not_done(
188                limiter.interrupt_reason(),
189                input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
190            ))
191        }
192
193        // Deleted storage changeset keys (account addresses and storage slots) with the highest
194        // block number deleted for that key.
195        //
196        // The size of this map is limited by `prune_delete_limit * blocks_since_last_run /
197        // STORAGE_HISTORY_TABLES_TO_PRUNE`, and with current defaults it's usually `3500 * 5
198        // / 2`, so 8750 entries. Each entry is `160 bit + 256 bit + 64 bit`, so the total
199        // size should be up to ~0.5MB + some hashmap overhead. `blocks_since_last_run` is
200        // additionally limited by the `max_reorg_depth`, so no OOM is expected here.
201        let mut last_changeset_pruned_block = None;
202        let mut highest_deleted_storages = FxHashMap::default();
203        let (pruned_changesets, done) =
204            provider.tx_ref().prune_table_with_range::<tables::StorageChangeSets>(
205                BlockNumberAddress::range(range),
206                &mut limiter,
207                |_| false,
208                |(BlockNumberAddress((block_number, address)), entry)| {
209                    highest_deleted_storages.insert((address, entry.key), block_number);
210                    last_changeset_pruned_block = Some(block_number);
211                },
212            )?;
213        trace!(target: "pruner", deleted = %pruned_changesets, %done, "Pruned storage history (changesets)");
214
215        let result = HistoryPruneResult {
216            highest_deleted: highest_deleted_storages,
217            last_pruned_block: last_changeset_pruned_block,
218            pruned_count: pruned_changesets,
219            done,
220        };
221        finalize_history_prune::<_, tables::StoragesHistory, (Address, B256), _>(
222            provider,
223            result,
224            range_end,
225            &limiter,
226            |(address, storage_key), block_number| {
227                StorageShardedKey::new(address, storage_key, block_number)
228            },
229            |a, b| a.address == b.address && a.sharded_key.key == b.sharded_key.key,
230        )
231        .map_err(Into::into)
232    }
233
234    /// Prunes storage history when indices are stored in `RocksDB`.
235    ///
236    /// Reads storage changesets from static files and prunes the corresponding
237    /// `RocksDB` history shards.
238    fn prune_rocksdb<Provider>(
239        &self,
240        provider: &Provider,
241        input: PruneInput,
242        range: std::ops::RangeInclusive<BlockNumber>,
243        range_end: BlockNumber,
244    ) -> Result<SegmentOutput, PrunerError>
245    where
246        Provider: DBProvider + StaticFileProviderFactory + RocksDBProviderFactory,
247    {
248        let mut limiter = input.limiter;
249
250        if limiter.is_limit_reached() {
251            return Ok(SegmentOutput::not_done(
252                limiter.interrupt_reason(),
253                input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
254            ))
255        }
256
257        let mut highest_deleted_storages: FxHashMap<_, _> = FxHashMap::default();
258        let mut last_changeset_pruned_block = None;
259        let mut changesets_processed = 0usize;
260        let mut done = true;
261
262        // Walk storage changesets from static files using a streaming iterator.
263        // For each changeset, track the highest block number seen for each (address, storage_key)
264        // pair to determine which history shard entries need pruning.
265        let walker = provider.static_file_provider().walk_storage_changeset_range(range);
266        for result in walker {
267            if limiter.is_limit_reached() {
268                done = false;
269                break;
270            }
271            let (block_address, entry) = result?;
272            let block_number = block_address.block_number();
273            let address = block_address.address();
274            highest_deleted_storages.insert((address, entry.key), block_number);
275            last_changeset_pruned_block = Some(block_number);
276            changesets_processed += 1;
277            limiter.increment_deleted_entries_count();
278        }
279
280        trace!(target: "pruner", processed = %changesets_processed, %done, "Scanned storage changesets from static files");
281
282        let last_changeset_pruned_block = last_changeset_pruned_block
283            .map(|block_number| if done { block_number } else { block_number.saturating_sub(1) })
284            .unwrap_or(range_end);
285
286        // Prune RocksDB history shards for affected storage slots
287        let mut deleted_shards = 0usize;
288        let mut updated_shards = 0usize;
289
290        // Sort by (address, storage_key) for better RocksDB cache locality
291        let mut sorted_storages: Vec<_> = highest_deleted_storages.into_iter().collect();
292        sorted_storages.sort_unstable_by_key(|((addr, key), _)| (*addr, *key));
293
294        provider.with_rocksdb_batch(|mut batch| {
295            let targets: Vec<_> = sorted_storages
296                .iter()
297                .map(|((addr, key), highest)| {
298                    ((*addr, *key), (*highest).min(last_changeset_pruned_block))
299                })
300                .collect();
301
302            let outcomes = batch.prune_storage_history_batch(&targets)?;
303            deleted_shards = outcomes.deleted;
304            updated_shards = outcomes.updated;
305
306            Ok(((), Some(batch.into_inner())))
307        })?;
308
309        trace!(target: "pruner", deleted = deleted_shards, updated = updated_shards, %done, "Pruned storage history (RocksDB indices)");
310
311        // Delete static file jars only when fully processed. During provider.commit(), RocksDB
312        // batch is committed before the MDBX checkpoint. If crash occurs after RocksDB commit
313        // but before MDBX commit, on restart the pruner checkpoint indicates data needs
314        // re-pruning, but the RocksDB shards are already pruned - this is safe because pruning
315        // is idempotent (re-pruning already-pruned shards is a no-op).
316        if done {
317            provider.static_file_provider().delete_segment_below_block(
318                StaticFileSegment::StorageChangeSets,
319                last_changeset_pruned_block + 1,
320            )?;
321        }
322
323        let progress = limiter.progress(done);
324
325        Ok(SegmentOutput {
326            progress,
327            pruned: changesets_processed + deleted_shards + updated_shards,
328            checkpoint: Some(SegmentOutputCheckpoint {
329                block_number: Some(last_changeset_pruned_block),
330                tx_number: None,
331            }),
332        })
333    }
334}
335
336#[cfg(test)]
337mod tests {
338    use super::STORAGE_HISTORY_TABLES_TO_PRUNE;
339    use crate::segments::{PruneInput, PruneLimiter, Segment, SegmentOutput, StorageHistory};
340    use alloy_primitives::{BlockNumber, B256};
341    use assert_matches::assert_matches;
342    use reth_db_api::{models::StorageSettings, tables, BlockNumberList};
343    use reth_provider::{DBProvider, DatabaseProviderFactory, PruneCheckpointReader};
344    use reth_prune_types::{
345        PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
346    };
347    use reth_stages::test_utils::{StorageKind, TestStageDB};
348    use reth_storage_api::StorageSettingsCache;
349    use reth_testing_utils::generators::{
350        self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
351    };
352    use std::{collections::BTreeMap, ops::AddAssign};
353
354    #[test]
355    fn prune_legacy() {
356        let db = TestStageDB::default();
357        let mut rng = generators::rng();
358
359        let blocks = random_block_range(
360            &mut rng,
361            0..=5000,
362            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
363        );
364        db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
365
366        let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
367
368        let (changesets, _) = random_changeset_range(
369            &mut rng,
370            blocks.iter(),
371            accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
372            1..2,
373            1..2,
374        );
375        db.insert_changesets(changesets.clone(), None).expect("insert changesets");
376        db.insert_history(changesets.clone(), None).expect("insert history");
377
378        let storage_occurrences = db.table::<tables::StoragesHistory>().unwrap().into_iter().fold(
379            BTreeMap::<_, usize>::new(),
380            |mut map, (key, _)| {
381                map.entry((key.address, key.sharded_key.key)).or_default().add_assign(1);
382                map
383            },
384        );
385        assert!(storage_occurrences.into_iter().any(|(_, occurrences)| occurrences > 1));
386
387        assert_eq!(
388            db.table::<tables::StorageChangeSets>().unwrap().len(),
389            changesets.iter().flatten().flat_map(|(_, _, entries)| entries).count()
390        );
391
392        let original_shards = db.table::<tables::StoragesHistory>().unwrap();
393
394        let test_prune = |to_block: BlockNumber,
395                          run: usize,
396                          expected_result: (PruneProgress, usize)| {
397            let prune_mode = PruneMode::Before(to_block);
398            let deleted_entries_limit = 1000;
399            let mut limiter =
400                PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
401            let input = PruneInput {
402                previous_checkpoint: db
403                    .factory
404                    .provider()
405                    .unwrap()
406                    .get_prune_checkpoint(PruneSegment::StorageHistory)
407                    .unwrap(),
408                to_block,
409                limiter: limiter.clone(),
410            };
411            let segment = StorageHistory::new(prune_mode);
412
413            let provider = db.factory.database_provider_rw().unwrap();
414            provider.set_storage_settings_cache(StorageSettings::v1());
415            let result = segment.prune(&provider, input).unwrap();
416            limiter.increment_deleted_entries_count_by(result.pruned);
417
418            assert_matches!(
419                result,
420                SegmentOutput {progress, pruned, checkpoint: Some(_)}
421                    if (progress, pruned) == expected_result
422            );
423
424            segment
425                .save_checkpoint(
426                    &provider,
427                    result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
428                )
429                .unwrap();
430            provider.commit().expect("commit");
431
432            let changesets = changesets
433                .iter()
434                .enumerate()
435                .flat_map(|(block_number, changeset)| {
436                    changeset.iter().flat_map(move |(address, _, entries)| {
437                        entries.iter().map(move |entry| (block_number, address, entry))
438                    })
439                })
440                .collect::<Vec<_>>();
441
442            #[expect(clippy::skip_while_next)]
443            let pruned = changesets
444                .iter()
445                .enumerate()
446                .skip_while(|(i, (block_number, _, _))| {
447                    *i < deleted_entries_limit / STORAGE_HISTORY_TABLES_TO_PRUNE * run &&
448                        *block_number <= to_block as usize
449                })
450                .next()
451                .map(|(i, _)| i)
452                .unwrap_or_default();
453
454            // Skip what we've pruned so far, subtracting one to get last pruned block number
455            // further down
456            let mut pruned_changesets = changesets.iter().skip(pruned.saturating_sub(1));
457
458            let last_pruned_block_number = pruned_changesets
459                .next()
460                .map(|(block_number, _, _)| {
461                    (if result.progress.is_finished() {
462                        *block_number
463                    } else {
464                        block_number.saturating_sub(1)
465                    }) as BlockNumber
466                })
467                .unwrap_or(to_block);
468
469            let pruned_changesets = pruned_changesets.fold(
470                BTreeMap::<_, Vec<_>>::new(),
471                |mut acc, (block_number, address, entry)| {
472                    acc.entry((block_number, address)).or_default().push(entry);
473                    acc
474                },
475            );
476
477            assert_eq!(
478                db.table::<tables::StorageChangeSets>().unwrap().len(),
479                pruned_changesets.values().flatten().count()
480            );
481
482            let actual_shards = db.table::<tables::StoragesHistory>().unwrap();
483
484            let expected_shards = original_shards
485                .iter()
486                .filter(|(key, _)| key.sharded_key.highest_block_number > last_pruned_block_number)
487                .map(|(key, blocks)| {
488                    let new_blocks =
489                        blocks.iter().skip_while(|block| *block <= last_pruned_block_number);
490                    (key.clone(), BlockNumberList::new_pre_sorted(new_blocks))
491                })
492                .collect::<Vec<_>>();
493
494            assert_eq!(actual_shards, expected_shards);
495
496            assert_eq!(
497                db.factory
498                    .provider()
499                    .unwrap()
500                    .get_prune_checkpoint(PruneSegment::StorageHistory)
501                    .unwrap(),
502                Some(PruneCheckpoint {
503                    block_number: Some(last_pruned_block_number),
504                    tx_number: None,
505                    prune_mode
506                })
507            );
508        };
509
510        test_prune(
511            998,
512            1,
513            (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 500),
514        );
515        test_prune(998, 2, (PruneProgress::Finished, 499));
516        test_prune(1200, 3, (PruneProgress::Finished, 202));
517    }
518
519    /// Tests that when a limiter stops mid-block (with multiple storage changes for the same
520    /// block), the checkpoint is set to `block_number - 1` to avoid dangling index entries.
521    #[test]
522    fn prune_partial_progress_mid_block() {
523        use alloy_primitives::{Address, U256};
524        use reth_primitives_traits::Account;
525        use reth_testing_utils::generators::ChangeSet;
526
527        let db = TestStageDB::default();
528        let mut rng = generators::rng();
529
530        // Create blocks 0..=10
531        let blocks = random_block_range(
532            &mut rng,
533            0..=10,
534            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
535        );
536        db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
537
538        // Create specific changesets where block 5 has 4 storage changes
539        let addr1 = Address::with_last_byte(1);
540        let addr2 = Address::with_last_byte(2);
541
542        let account = Account { nonce: 1, balance: U256::from(100), bytecode_hash: None };
543
544        // Create storage entries
545        let storage_entry = |key: u8| reth_primitives_traits::StorageEntry {
546            key: B256::with_last_byte(key),
547            value: U256::from(100),
548        };
549
550        // Build changesets: blocks 0-4 have 1 storage change each, block 5 has 4 changes, block 6
551        // has 1. Entries within each account must be sorted by key.
552        let changesets: Vec<ChangeSet> = vec![
553            vec![(addr1, account, vec![storage_entry(1)])], // block 0
554            vec![(addr1, account, vec![storage_entry(1)])], // block 1
555            vec![(addr1, account, vec![storage_entry(1)])], // block 2
556            vec![(addr1, account, vec![storage_entry(1)])], // block 3
557            vec![(addr1, account, vec![storage_entry(1)])], // block 4
558            // block 5: 4 different storage changes (2 addresses, each with 2 storage slots)
559            // Sorted by address, then by storage key within each address
560            vec![
561                (addr1, account, vec![storage_entry(1), storage_entry(2)]),
562                (addr2, account, vec![storage_entry(1), storage_entry(2)]),
563            ],
564            vec![(addr1, account, vec![storage_entry(3)])], // block 6
565        ];
566
567        db.insert_changesets(changesets.clone(), None).expect("insert changesets");
568        db.insert_history(changesets.clone(), None).expect("insert history");
569
570        // Total storage changesets
571        let total_storage_entries: usize =
572            changesets.iter().flat_map(|c| c.iter()).map(|(_, _, entries)| entries.len()).sum();
573        assert_eq!(db.table::<tables::StorageChangeSets>().unwrap().len(), total_storage_entries);
574
575        let prune_mode = PruneMode::Before(10);
576
577        // Set limiter to stop mid-block 5
578        // With STORAGE_HISTORY_TABLES_TO_PRUNE=2, limit=14 gives us 7 storage entries before limit
579        // Blocks 0-4 use 5 slots, leaving 2 for block 5 (which has 4), so we stop mid-block 5
580        let deleted_entries_limit = 14; // 14/2 = 7 storage entries before limit
581        let limiter = PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
582
583        let input = PruneInput { previous_checkpoint: None, to_block: 10, limiter };
584        let segment = StorageHistory::new(prune_mode);
585
586        let provider = db.factory.database_provider_rw().unwrap();
587        provider.set_storage_settings_cache(StorageSettings::v1());
588        let result = segment.prune(&provider, input).unwrap();
589
590        // Should report that there's more data
591        assert!(!result.progress.is_finished(), "Expected HasMoreData since we stopped mid-block");
592
593        // Save checkpoint and commit
594        segment
595            .save_checkpoint(&provider, result.checkpoint.unwrap().as_prune_checkpoint(prune_mode))
596            .unwrap();
597        provider.commit().expect("commit");
598
599        // Verify checkpoint is set to block 4 (not 5), since block 5 is incomplete
600        let checkpoint = db
601            .factory
602            .provider()
603            .unwrap()
604            .get_prune_checkpoint(PruneSegment::StorageHistory)
605            .unwrap()
606            .expect("checkpoint should exist");
607
608        assert_eq!(
609            checkpoint.block_number,
610            Some(4),
611            "Checkpoint should be block 4 (block before incomplete block 5)"
612        );
613
614        // Verify remaining changesets
615        let remaining_changesets = db.table::<tables::StorageChangeSets>().unwrap();
616        assert!(
617            !remaining_changesets.is_empty(),
618            "Should have remaining changesets for blocks 5-6"
619        );
620
621        // Verify no dangling history indices for blocks that weren't fully pruned
622        let history = db.table::<tables::StoragesHistory>().unwrap();
623        for (key, _blocks) in &history {
624            assert!(
625                key.sharded_key.highest_block_number > 4,
626                "Found stale history shard with highest_block_number {} <= checkpoint 4",
627                key.sharded_key.highest_block_number
628            );
629        }
630
631        // Run prune again to complete - should finish processing block 5 and 6
632        let input2 = PruneInput {
633            previous_checkpoint: Some(checkpoint),
634            to_block: 10,
635            limiter: PruneLimiter::default().set_deleted_entries_limit(100), // high limit
636        };
637
638        let provider2 = db.factory.database_provider_rw().unwrap();
639        provider2.set_storage_settings_cache(StorageSettings::v1());
640        let result2 = segment.prune(&provider2, input2).unwrap();
641
642        assert!(result2.progress.is_finished(), "Second run should complete");
643
644        segment
645            .save_checkpoint(
646                &provider2,
647                result2.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
648            )
649            .unwrap();
650        provider2.commit().expect("commit");
651
652        // Verify final checkpoint
653        let final_checkpoint = db
654            .factory
655            .provider()
656            .unwrap()
657            .get_prune_checkpoint(PruneSegment::StorageHistory)
658            .unwrap()
659            .expect("checkpoint should exist");
660
661        // Should now be at block 6 (the last block with changesets)
662        assert_eq!(final_checkpoint.block_number, Some(6), "Final checkpoint should be at block 6");
663
664        // All changesets should be pruned
665        let final_changesets = db.table::<tables::StorageChangeSets>().unwrap();
666        assert!(final_changesets.is_empty(), "All changesets up to block 10 should be pruned");
667    }
668
669    #[test]
670    fn prune_rocksdb() {
671        use reth_db_api::models::storage_sharded_key::StorageShardedKey;
672        use reth_provider::RocksDBProviderFactory;
673        use reth_storage_api::StorageSettings;
674
675        let db = TestStageDB::default();
676        let mut rng = generators::rng();
677
678        let blocks = random_block_range(
679            &mut rng,
680            0..=100,
681            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
682        );
683        db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
684
685        let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
686
687        let (changesets, _) = random_changeset_range(
688            &mut rng,
689            blocks.iter(),
690            accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
691            1..2,
692            1..2,
693        );
694
695        db.insert_changesets_to_static_files(changesets.clone(), None)
696            .expect("insert changesets to static files");
697
698        let mut storage_indices: BTreeMap<(alloy_primitives::Address, B256), Vec<u64>> =
699            BTreeMap::new();
700        for (block, changeset) in changesets.iter().enumerate() {
701            for (address, _, storage_entries) in changeset {
702                for entry in storage_entries {
703                    storage_indices.entry((*address, entry.key)).or_default().push(block as u64);
704                }
705            }
706        }
707
708        {
709            let rocksdb = db.factory.rocksdb_provider();
710            let mut batch = rocksdb.batch();
711            for ((address, storage_key), block_numbers) in &storage_indices {
712                let shard = BlockNumberList::new_pre_sorted(block_numbers.clone());
713                batch
714                    .put::<tables::StoragesHistory>(
715                        StorageShardedKey::last(*address, *storage_key),
716                        &shard,
717                    )
718                    .expect("insert storage history shard");
719            }
720            batch.commit().expect("commit rocksdb batch");
721        }
722
723        {
724            let rocksdb = db.factory.rocksdb_provider();
725            for (address, storage_key) in storage_indices.keys() {
726                let shards = rocksdb.storage_history_shards(*address, *storage_key).unwrap();
727                assert!(!shards.is_empty(), "RocksDB should contain storage history before prune");
728            }
729        }
730
731        let to_block = 50u64;
732        let prune_mode = PruneMode::Before(to_block);
733        let input =
734            PruneInput { previous_checkpoint: None, to_block, limiter: PruneLimiter::default() };
735        let segment = StorageHistory::new(prune_mode);
736
737        let provider = db.factory.database_provider_rw().unwrap();
738        provider.set_storage_settings_cache(StorageSettings::v2());
739        let result = segment.prune(&provider, input).unwrap();
740        provider.commit().expect("commit");
741
742        assert_matches!(
743            result,
744            SegmentOutput { progress: PruneProgress::Finished, checkpoint: Some(_), .. }
745        );
746
747        {
748            let rocksdb = db.factory.rocksdb_provider();
749            for ((address, storage_key), block_numbers) in &storage_indices {
750                let shards = rocksdb.storage_history_shards(*address, *storage_key).unwrap();
751
752                let remaining_blocks: Vec<u64> =
753                    block_numbers.iter().copied().filter(|&b| b > to_block).collect();
754
755                if remaining_blocks.is_empty() {
756                    assert!(
757                        shards.is_empty(),
758                        "Shard for {:?}/{:?} should be deleted when all blocks pruned",
759                        address,
760                        storage_key
761                    );
762                } else {
763                    assert!(!shards.is_empty(), "Shard should exist with remaining blocks");
764                    let actual_blocks: Vec<u64> =
765                        shards.iter().flat_map(|(_, list)| list.iter()).collect();
766                    assert_eq!(
767                        actual_blocks, remaining_blocks,
768                        "RocksDB shard should only contain blocks > {}",
769                        to_block
770                    );
771                }
772            }
773        }
774    }
775}