Skip to main content

reth_prune/segments/user/
storage_history.rs

1use crate::{
2    db_ext::DbTxPruneExt,
3    segments::{
4        user::history::{finalize_history_prune, HistoryPruneResult},
5        PruneInput, Segment,
6    },
7    PrunerError,
8};
9use alloy_primitives::{Address, BlockNumber, B256};
10use reth_db_api::{
11    models::{storage_sharded_key::StorageShardedKey, BlockNumberAddress},
12    tables,
13    transaction::DbTxMut,
14};
15use reth_provider::{DBProvider, EitherWriter, RocksDBProviderFactory, StaticFileProviderFactory};
16use reth_prune_types::{
17    PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
18};
19use reth_static_file_types::StaticFileSegment;
20use reth_storage_api::{StorageChangeSetReader, StorageSettingsCache};
21use rustc_hash::FxHashMap;
22use tracing::{instrument, trace};
23
24/// Number of storage history tables to prune in one step.
25///
26/// Storage History consists of two tables: [`tables::StorageChangeSets`] and
27/// [`tables::StoragesHistory`]. We want to prune them to the same block number.
28const STORAGE_HISTORY_TABLES_TO_PRUNE: usize = 2;
29
30#[derive(Debug)]
31pub struct StorageHistory {
32    mode: PruneMode,
33}
34
35impl StorageHistory {
36    pub const fn new(mode: PruneMode) -> Self {
37        Self { mode }
38    }
39}
40
41impl<Provider> Segment<Provider> for StorageHistory
42where
43    Provider: DBProvider<Tx: DbTxMut>
44        + StaticFileProviderFactory
45        + StorageChangeSetReader
46        + StorageSettingsCache
47        + RocksDBProviderFactory,
48{
49    fn segment(&self) -> PruneSegment {
50        PruneSegment::StorageHistory
51    }
52
53    fn mode(&self) -> Option<PruneMode> {
54        Some(self.mode)
55    }
56
57    fn purpose(&self) -> PrunePurpose {
58        PrunePurpose::User
59    }
60
61    #[instrument(
62        name = "StorageHistory::prune",
63        target = "pruner",
64        skip(self, provider),
65        ret(level = "trace")
66    )]
67    fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
68        let range = match input.get_next_block_range() {
69            Some(range) => range,
70            None => {
71                trace!(target: "pruner", "No storage history to prune");
72                return Ok(SegmentOutput::done())
73            }
74        };
75        let range_end = *range.end();
76
77        // Check where storage history indices are stored
78        #[cfg(all(unix, feature = "rocksdb"))]
79        if provider.cached_storage_settings().storage_v2 {
80            return self.prune_rocksdb(provider, input, range, range_end);
81        }
82
83        // Check where storage changesets are stored (MDBX path)
84        if EitherWriter::storage_changesets_destination(provider).is_static_file() {
85            self.prune_static_files(provider, input, range, range_end)
86        } else {
87            self.prune_database(provider, input, range, range_end)
88        }
89    }
90}
91
92impl StorageHistory {
93    /// Prunes storage history when changesets are stored in static files.
94    fn prune_static_files<Provider>(
95        &self,
96        provider: &Provider,
97        input: PruneInput,
98        range: std::ops::RangeInclusive<BlockNumber>,
99        range_end: BlockNumber,
100    ) -> Result<SegmentOutput, PrunerError>
101    where
102        Provider: DBProvider<Tx: DbTxMut> + StaticFileProviderFactory,
103    {
104        let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() {
105            input.limiter.set_deleted_entries_limit(limit / STORAGE_HISTORY_TABLES_TO_PRUNE)
106        } else {
107            input.limiter
108        };
109
110        // The limiter may already be exhausted from a previous segment in the same prune run.
111        // Early exit avoids unnecessary iteration when no budget remains.
112        if limiter.is_limit_reached() {
113            return Ok(SegmentOutput::not_done(
114                limiter.interrupt_reason(),
115                input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
116            ))
117        }
118
119        // The size of this map is limited by `prune_delete_limit * blocks_since_last_run /
120        // STORAGE_HISTORY_TABLES_TO_PRUNE`, and with current defaults it's usually `3500 * 5
121        // / 2`, so 8750 entries. Each entry is `160 bit + 256 bit + 64 bit`, so the total
122        // size should be up to ~0.5MB + some hashmap overhead. `blocks_since_last_run` is
123        // additionally limited by the `max_reorg_depth`, so no OOM is expected here.
124        let mut highest_deleted_storages = FxHashMap::default();
125        let mut last_changeset_pruned_block = None;
126        let mut pruned_changesets = 0;
127        let mut done = true;
128
129        let walker = provider.static_file_provider().walk_storage_changeset_range(range);
130        for result in walker {
131            if limiter.is_limit_reached() {
132                done = false;
133                break;
134            }
135            let (block_address, entry) = result?;
136            let block_number = block_address.block_number();
137            let address = block_address.address();
138            highest_deleted_storages.insert((address, entry.key.as_b256()), block_number);
139            last_changeset_pruned_block = Some(block_number);
140            pruned_changesets += 1;
141            limiter.increment_deleted_entries_count();
142        }
143
144        // Delete static file jars only when fully processed
145        if done && let Some(last_block) = last_changeset_pruned_block {
146            provider
147                .static_file_provider()
148                .delete_segment_below_block(StaticFileSegment::StorageChangeSets, last_block + 1)?;
149        }
150        trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned storage history (changesets from static files)");
151
152        let result = HistoryPruneResult {
153            highest_deleted: highest_deleted_storages,
154            last_pruned_block: last_changeset_pruned_block,
155            pruned_count: pruned_changesets,
156            done,
157        };
158        finalize_history_prune::<_, tables::StoragesHistory, (Address, B256), _>(
159            provider,
160            result,
161            range_end,
162            &limiter,
163            |(address, storage_key), block_number| {
164                StorageShardedKey::new(address, storage_key, block_number)
165            },
166            |a, b| a.address == b.address && a.sharded_key.key == b.sharded_key.key,
167        )
168        .map_err(Into::into)
169    }
170
171    fn prune_database<Provider>(
172        &self,
173        provider: &Provider,
174        input: PruneInput,
175        range: std::ops::RangeInclusive<BlockNumber>,
176        range_end: BlockNumber,
177    ) -> Result<SegmentOutput, PrunerError>
178    where
179        Provider: DBProvider<Tx: DbTxMut>,
180    {
181        let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() {
182            input.limiter.set_deleted_entries_limit(limit / STORAGE_HISTORY_TABLES_TO_PRUNE)
183        } else {
184            input.limiter
185        };
186
187        if limiter.is_limit_reached() {
188            return Ok(SegmentOutput::not_done(
189                limiter.interrupt_reason(),
190                input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
191            ))
192        }
193
194        // Deleted storage changeset keys (account addresses and storage slots) with the highest
195        // block number deleted for that key.
196        //
197        // The size of this map is limited by `prune_delete_limit * blocks_since_last_run /
198        // STORAGE_HISTORY_TABLES_TO_PRUNE`, and with current defaults it's usually `3500 * 5
199        // / 2`, so 8750 entries. Each entry is `160 bit + 256 bit + 64 bit`, so the total
200        // size should be up to ~0.5MB + some hashmap overhead. `blocks_since_last_run` is
201        // additionally limited by the `max_reorg_depth`, so no OOM is expected here.
202        let mut last_changeset_pruned_block = None;
203        let mut highest_deleted_storages = FxHashMap::default();
204        let (pruned_changesets, done) =
205            provider.tx_ref().prune_table_with_range::<tables::StorageChangeSets>(
206                BlockNumberAddress::range(range),
207                &mut limiter,
208                |_| false,
209                |(BlockNumberAddress((block_number, address)), entry)| {
210                    highest_deleted_storages.insert((address, entry.key), block_number);
211                    last_changeset_pruned_block = Some(block_number);
212                },
213            )?;
214        trace!(target: "pruner", deleted = %pruned_changesets, %done, "Pruned storage history (changesets)");
215
216        let result = HistoryPruneResult {
217            highest_deleted: highest_deleted_storages,
218            last_pruned_block: last_changeset_pruned_block,
219            pruned_count: pruned_changesets,
220            done,
221        };
222        finalize_history_prune::<_, tables::StoragesHistory, (Address, B256), _>(
223            provider,
224            result,
225            range_end,
226            &limiter,
227            |(address, storage_key), block_number| {
228                StorageShardedKey::new(address, storage_key, block_number)
229            },
230            |a, b| a.address == b.address && a.sharded_key.key == b.sharded_key.key,
231        )
232        .map_err(Into::into)
233    }
234
235    /// Prunes storage history when indices are stored in `RocksDB`.
236    ///
237    /// Reads storage changesets from static files and prunes the corresponding
238    /// `RocksDB` history shards.
239    #[cfg(all(unix, feature = "rocksdb"))]
240    fn prune_rocksdb<Provider>(
241        &self,
242        provider: &Provider,
243        input: PruneInput,
244        range: std::ops::RangeInclusive<BlockNumber>,
245        range_end: BlockNumber,
246    ) -> Result<SegmentOutput, PrunerError>
247    where
248        Provider: DBProvider + StaticFileProviderFactory + RocksDBProviderFactory,
249    {
250        let mut limiter = input.limiter;
251
252        if limiter.is_limit_reached() {
253            return Ok(SegmentOutput::not_done(
254                limiter.interrupt_reason(),
255                input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
256            ))
257        }
258
259        let mut highest_deleted_storages: FxHashMap<_, _> = FxHashMap::default();
260        let mut last_changeset_pruned_block = None;
261        let mut changesets_processed = 0usize;
262        let mut done = true;
263
264        // Walk storage changesets from static files using a streaming iterator.
265        // For each changeset, track the highest block number seen for each (address, storage_key)
266        // pair to determine which history shard entries need pruning.
267        let walker = provider.static_file_provider().walk_storage_changeset_range(range);
268        for result in walker {
269            if limiter.is_limit_reached() {
270                done = false;
271                break;
272            }
273            let (block_address, entry) = result?;
274            let block_number = block_address.block_number();
275            let address = block_address.address();
276            highest_deleted_storages.insert((address, entry.key.as_b256()), block_number);
277            last_changeset_pruned_block = Some(block_number);
278            changesets_processed += 1;
279            limiter.increment_deleted_entries_count();
280        }
281
282        trace!(target: "pruner", processed = %changesets_processed, %done, "Scanned storage changesets from static files");
283
284        let last_changeset_pruned_block = last_changeset_pruned_block
285            .map(|block_number| if done { block_number } else { block_number.saturating_sub(1) })
286            .unwrap_or(range_end);
287
288        // Prune RocksDB history shards for affected storage slots
289        let mut deleted_shards = 0usize;
290        let mut updated_shards = 0usize;
291
292        // Sort by (address, storage_key) for better RocksDB cache locality
293        let mut sorted_storages: Vec<_> = highest_deleted_storages.into_iter().collect();
294        sorted_storages.sort_unstable_by_key(|((addr, key), _)| (*addr, *key));
295
296        provider.with_rocksdb_batch(|mut batch| {
297            let targets: Vec<_> = sorted_storages
298                .iter()
299                .map(|((addr, key), highest)| {
300                    ((*addr, *key), (*highest).min(last_changeset_pruned_block))
301                })
302                .collect();
303
304            let outcomes = batch.prune_storage_history_batch(&targets)?;
305            deleted_shards = outcomes.deleted;
306            updated_shards = outcomes.updated;
307
308            Ok(((), Some(batch.into_inner())))
309        })?;
310
311        trace!(target: "pruner", deleted = deleted_shards, updated = updated_shards, %done, "Pruned storage history (RocksDB indices)");
312
313        // Delete static file jars only when fully processed. During provider.commit(), RocksDB
314        // batch is committed before the MDBX checkpoint. If crash occurs after RocksDB commit
315        // but before MDBX commit, on restart the pruner checkpoint indicates data needs
316        // re-pruning, but the RocksDB shards are already pruned - this is safe because pruning
317        // is idempotent (re-pruning already-pruned shards is a no-op).
318        if done {
319            provider.static_file_provider().delete_segment_below_block(
320                StaticFileSegment::StorageChangeSets,
321                last_changeset_pruned_block + 1,
322            )?;
323        }
324
325        let progress = limiter.progress(done);
326
327        Ok(SegmentOutput {
328            progress,
329            pruned: changesets_processed + deleted_shards + updated_shards,
330            checkpoint: Some(SegmentOutputCheckpoint {
331                block_number: Some(last_changeset_pruned_block),
332                tx_number: None,
333            }),
334        })
335    }
336}
337
338#[cfg(test)]
339mod tests {
340    use super::STORAGE_HISTORY_TABLES_TO_PRUNE;
341    use crate::segments::{PruneInput, PruneLimiter, Segment, SegmentOutput, StorageHistory};
342    use alloy_primitives::{BlockNumber, B256};
343    use assert_matches::assert_matches;
344    use reth_db_api::{models::StorageSettings, tables, BlockNumberList};
345    use reth_provider::{DBProvider, DatabaseProviderFactory, PruneCheckpointReader};
346    use reth_prune_types::{
347        PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
348    };
349    use reth_stages::test_utils::{StorageKind, TestStageDB};
350    use reth_storage_api::StorageSettingsCache;
351    use reth_testing_utils::generators::{
352        self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
353    };
354    use std::{collections::BTreeMap, ops::AddAssign};
355
356    #[test]
357    fn prune_legacy() {
358        let db = TestStageDB::default();
359        let mut rng = generators::rng();
360
361        let blocks = random_block_range(
362            &mut rng,
363            0..=5000,
364            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
365        );
366        db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
367
368        let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
369
370        let (changesets, _) = random_changeset_range(
371            &mut rng,
372            blocks.iter(),
373            accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
374            1..2,
375            1..2,
376        );
377        db.insert_changesets(changesets.clone(), None).expect("insert changesets");
378        db.insert_history(changesets.clone(), None).expect("insert history");
379
380        let storage_occurrences = db.table::<tables::StoragesHistory>().unwrap().into_iter().fold(
381            BTreeMap::<_, usize>::new(),
382            |mut map, (key, _)| {
383                map.entry((key.address, key.sharded_key.key)).or_default().add_assign(1);
384                map
385            },
386        );
387        assert!(storage_occurrences.into_iter().any(|(_, occurrences)| occurrences > 1));
388
389        assert_eq!(
390            db.table::<tables::StorageChangeSets>().unwrap().len(),
391            changesets.iter().flatten().flat_map(|(_, _, entries)| entries).count()
392        );
393
394        let original_shards = db.table::<tables::StoragesHistory>().unwrap();
395
396        let test_prune = |to_block: BlockNumber,
397                          run: usize,
398                          expected_result: (PruneProgress, usize)| {
399            let prune_mode = PruneMode::Before(to_block);
400            let deleted_entries_limit = 1000;
401            let mut limiter =
402                PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
403            let input = PruneInput {
404                previous_checkpoint: db
405                    .factory
406                    .provider()
407                    .unwrap()
408                    .get_prune_checkpoint(PruneSegment::StorageHistory)
409                    .unwrap(),
410                to_block,
411                limiter: limiter.clone(),
412            };
413            let segment = StorageHistory::new(prune_mode);
414
415            let provider = db.factory.database_provider_rw().unwrap();
416            provider.set_storage_settings_cache(StorageSettings::v1());
417            let result = segment.prune(&provider, input).unwrap();
418            limiter.increment_deleted_entries_count_by(result.pruned);
419
420            assert_matches!(
421                result,
422                SegmentOutput {progress, pruned, checkpoint: Some(_)}
423                    if (progress, pruned) == expected_result
424            );
425
426            segment
427                .save_checkpoint(
428                    &provider,
429                    result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
430                )
431                .unwrap();
432            provider.commit().expect("commit");
433
434            let changesets = changesets
435                .iter()
436                .enumerate()
437                .flat_map(|(block_number, changeset)| {
438                    changeset.iter().flat_map(move |(address, _, entries)| {
439                        entries.iter().map(move |entry| (block_number, address, entry))
440                    })
441                })
442                .collect::<Vec<_>>();
443
444            #[expect(clippy::skip_while_next)]
445            let pruned = changesets
446                .iter()
447                .enumerate()
448                .skip_while(|(i, (block_number, _, _))| {
449                    *i < deleted_entries_limit / STORAGE_HISTORY_TABLES_TO_PRUNE * run &&
450                        *block_number <= to_block as usize
451                })
452                .next()
453                .map(|(i, _)| i)
454                .unwrap_or_default();
455
456            // Skip what we've pruned so far, subtracting one to get last pruned block number
457            // further down
458            let mut pruned_changesets = changesets.iter().skip(pruned.saturating_sub(1));
459
460            let last_pruned_block_number = pruned_changesets
461                .next()
462                .map(|(block_number, _, _)| {
463                    (if result.progress.is_finished() {
464                        *block_number
465                    } else {
466                        block_number.saturating_sub(1)
467                    }) as BlockNumber
468                })
469                .unwrap_or(to_block);
470
471            let pruned_changesets = pruned_changesets.fold(
472                BTreeMap::<_, Vec<_>>::new(),
473                |mut acc, (block_number, address, entry)| {
474                    acc.entry((block_number, address)).or_default().push(entry);
475                    acc
476                },
477            );
478
479            assert_eq!(
480                db.table::<tables::StorageChangeSets>().unwrap().len(),
481                pruned_changesets.values().flatten().count()
482            );
483
484            let actual_shards = db.table::<tables::StoragesHistory>().unwrap();
485
486            let expected_shards = original_shards
487                .iter()
488                .filter(|(key, _)| key.sharded_key.highest_block_number > last_pruned_block_number)
489                .map(|(key, blocks)| {
490                    let new_blocks =
491                        blocks.iter().skip_while(|block| *block <= last_pruned_block_number);
492                    (key.clone(), BlockNumberList::new_pre_sorted(new_blocks))
493                })
494                .collect::<Vec<_>>();
495
496            assert_eq!(actual_shards, expected_shards);
497
498            assert_eq!(
499                db.factory
500                    .provider()
501                    .unwrap()
502                    .get_prune_checkpoint(PruneSegment::StorageHistory)
503                    .unwrap(),
504                Some(PruneCheckpoint {
505                    block_number: Some(last_pruned_block_number),
506                    tx_number: None,
507                    prune_mode
508                })
509            );
510        };
511
512        test_prune(
513            998,
514            1,
515            (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 500),
516        );
517        test_prune(998, 2, (PruneProgress::Finished, 499));
518        test_prune(1200, 3, (PruneProgress::Finished, 202));
519    }
520
521    /// Tests the `prune_static_files` code path. On unix with rocksdb feature, v2 storage
522    /// routes to `prune_rocksdb` instead, so this test only runs without rocksdb (the
523    /// `prune_rocksdb_path` test covers that configuration).
524    #[test]
525    #[cfg(not(all(unix, feature = "rocksdb")))]
526    fn prune_static_file() {
527        let db = TestStageDB::default();
528        let mut rng = generators::rng();
529
530        let blocks = random_block_range(
531            &mut rng,
532            0..=5000,
533            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
534        );
535        db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
536
537        let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
538
539        let (changesets, _) = random_changeset_range(
540            &mut rng,
541            blocks.iter(),
542            accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
543            1..2,
544            1..2,
545        );
546
547        db.insert_changesets_to_static_files(changesets.clone(), None)
548            .expect("insert changesets to static files");
549        db.insert_history(changesets.clone(), None).expect("insert history");
550
551        let storage_occurrences = db.table::<tables::StoragesHistory>().unwrap().into_iter().fold(
552            BTreeMap::<_, usize>::new(),
553            |mut map, (key, _)| {
554                map.entry((key.address, key.sharded_key.key)).or_default().add_assign(1);
555                map
556            },
557        );
558        assert!(storage_occurrences.into_iter().any(|(_, occurrences)| occurrences > 1));
559
560        let original_shards = db.table::<tables::StoragesHistory>().unwrap();
561
562        let test_prune = |to_block: BlockNumber,
563                          run: usize,
564                          expected_result: (PruneProgress, usize)| {
565            let prune_mode = PruneMode::Before(to_block);
566            let deleted_entries_limit = 1000;
567            let mut limiter =
568                PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
569            let input = PruneInput {
570                previous_checkpoint: db
571                    .factory
572                    .provider()
573                    .unwrap()
574                    .get_prune_checkpoint(PruneSegment::StorageHistory)
575                    .unwrap(),
576                to_block,
577                limiter: limiter.clone(),
578            };
579            let segment = StorageHistory::new(prune_mode);
580
581            let provider = db.factory.database_provider_rw().unwrap();
582            provider.set_storage_settings_cache(StorageSettings::v2());
583            let result = segment.prune(&provider, input).unwrap();
584            limiter.increment_deleted_entries_count_by(result.pruned);
585
586            assert_matches!(
587                result,
588                SegmentOutput {progress, pruned, checkpoint: Some(_)}
589                    if (progress, pruned) == expected_result
590            );
591
592            segment
593                .save_checkpoint(
594                    &provider,
595                    result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
596                )
597                .unwrap();
598            provider.commit().expect("commit");
599
600            let changesets = changesets
601                .iter()
602                .enumerate()
603                .flat_map(|(block_number, changeset)| {
604                    changeset.iter().flat_map(move |(address, _, entries)| {
605                        entries.iter().map(move |entry| (block_number, address, entry))
606                    })
607                })
608                .collect::<Vec<_>>();
609
610            #[expect(clippy::skip_while_next)]
611            let pruned = changesets
612                .iter()
613                .enumerate()
614                .skip_while(|(i, (block_number, _, _))| {
615                    *i < deleted_entries_limit / STORAGE_HISTORY_TABLES_TO_PRUNE * run &&
616                        *block_number <= to_block as usize
617                })
618                .next()
619                .map(|(i, _)| i)
620                .unwrap_or_default();
621
622            // Skip what we've pruned so far, subtracting one to get last pruned block number
623            // further down
624            let mut pruned_changesets = changesets.iter().skip(pruned.saturating_sub(1));
625
626            let last_pruned_block_number = pruned_changesets
627                .next()
628                .map(|(block_number, _, _)| {
629                    (if result.progress.is_finished() {
630                        *block_number
631                    } else {
632                        block_number.saturating_sub(1)
633                    }) as BlockNumber
634                })
635                .unwrap_or(to_block);
636
637            let actual_shards = db.table::<tables::StoragesHistory>().unwrap();
638
639            let expected_shards = original_shards
640                .iter()
641                .filter(|(key, _)| key.sharded_key.highest_block_number > last_pruned_block_number)
642                .map(|(key, blocks)| {
643                    let new_blocks =
644                        blocks.iter().skip_while(|block| *block <= last_pruned_block_number);
645                    (key.clone(), BlockNumberList::new_pre_sorted(new_blocks))
646                })
647                .collect::<Vec<_>>();
648
649            assert_eq!(actual_shards, expected_shards);
650
651            assert_eq!(
652                db.factory
653                    .provider()
654                    .unwrap()
655                    .get_prune_checkpoint(PruneSegment::StorageHistory)
656                    .unwrap(),
657                Some(PruneCheckpoint {
658                    block_number: Some(last_pruned_block_number),
659                    tx_number: None,
660                    prune_mode
661                })
662            );
663        };
664
665        test_prune(
666            998,
667            1,
668            (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 500),
669        );
670        test_prune(998, 2, (PruneProgress::Finished, 500));
671        test_prune(1200, 3, (PruneProgress::Finished, 202));
672    }
673
674    /// Tests that when a limiter stops mid-block (with multiple storage changes for the same
675    /// block), the checkpoint is set to `block_number - 1` to avoid dangling index entries.
676    #[test]
677    fn prune_partial_progress_mid_block() {
678        use alloy_primitives::{Address, U256};
679        use reth_primitives_traits::Account;
680        use reth_testing_utils::generators::ChangeSet;
681
682        let db = TestStageDB::default();
683        let mut rng = generators::rng();
684
685        // Create blocks 0..=10
686        let blocks = random_block_range(
687            &mut rng,
688            0..=10,
689            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
690        );
691        db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
692
693        // Create specific changesets where block 5 has 4 storage changes
694        let addr1 = Address::with_last_byte(1);
695        let addr2 = Address::with_last_byte(2);
696
697        let account = Account { nonce: 1, balance: U256::from(100), bytecode_hash: None };
698
699        // Create storage entries
700        let storage_entry = |key: u8| reth_primitives_traits::StorageEntry {
701            key: B256::with_last_byte(key),
702            value: U256::from(100),
703        };
704
705        // Build changesets: blocks 0-4 have 1 storage change each, block 5 has 4 changes, block 6
706        // has 1. Entries within each account must be sorted by key.
707        let changesets: Vec<ChangeSet> = vec![
708            vec![(addr1, account, vec![storage_entry(1)])], // block 0
709            vec![(addr1, account, vec![storage_entry(1)])], // block 1
710            vec![(addr1, account, vec![storage_entry(1)])], // block 2
711            vec![(addr1, account, vec![storage_entry(1)])], // block 3
712            vec![(addr1, account, vec![storage_entry(1)])], // block 4
713            // block 5: 4 different storage changes (2 addresses, each with 2 storage slots)
714            // Sorted by address, then by storage key within each address
715            vec![
716                (addr1, account, vec![storage_entry(1), storage_entry(2)]),
717                (addr2, account, vec![storage_entry(1), storage_entry(2)]),
718            ],
719            vec![(addr1, account, vec![storage_entry(3)])], // block 6
720        ];
721
722        db.insert_changesets(changesets.clone(), None).expect("insert changesets");
723        db.insert_history(changesets.clone(), None).expect("insert history");
724
725        // Total storage changesets
726        let total_storage_entries: usize =
727            changesets.iter().flat_map(|c| c.iter()).map(|(_, _, entries)| entries.len()).sum();
728        assert_eq!(db.table::<tables::StorageChangeSets>().unwrap().len(), total_storage_entries);
729
730        let prune_mode = PruneMode::Before(10);
731
732        // Set limiter to stop mid-block 5
733        // With STORAGE_HISTORY_TABLES_TO_PRUNE=2, limit=14 gives us 7 storage entries before limit
734        // Blocks 0-4 use 5 slots, leaving 2 for block 5 (which has 4), so we stop mid-block 5
735        let deleted_entries_limit = 14; // 14/2 = 7 storage entries before limit
736        let limiter = PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
737
738        let input = PruneInput { previous_checkpoint: None, to_block: 10, limiter };
739        let segment = StorageHistory::new(prune_mode);
740
741        let provider = db.factory.database_provider_rw().unwrap();
742        provider.set_storage_settings_cache(StorageSettings::v1());
743        let result = segment.prune(&provider, input).unwrap();
744
745        // Should report that there's more data
746        assert!(!result.progress.is_finished(), "Expected HasMoreData since we stopped mid-block");
747
748        // Save checkpoint and commit
749        segment
750            .save_checkpoint(&provider, result.checkpoint.unwrap().as_prune_checkpoint(prune_mode))
751            .unwrap();
752        provider.commit().expect("commit");
753
754        // Verify checkpoint is set to block 4 (not 5), since block 5 is incomplete
755        let checkpoint = db
756            .factory
757            .provider()
758            .unwrap()
759            .get_prune_checkpoint(PruneSegment::StorageHistory)
760            .unwrap()
761            .expect("checkpoint should exist");
762
763        assert_eq!(
764            checkpoint.block_number,
765            Some(4),
766            "Checkpoint should be block 4 (block before incomplete block 5)"
767        );
768
769        // Verify remaining changesets
770        let remaining_changesets = db.table::<tables::StorageChangeSets>().unwrap();
771        assert!(
772            !remaining_changesets.is_empty(),
773            "Should have remaining changesets for blocks 5-6"
774        );
775
776        // Verify no dangling history indices for blocks that weren't fully pruned
777        let history = db.table::<tables::StoragesHistory>().unwrap();
778        for (key, _blocks) in &history {
779            assert!(
780                key.sharded_key.highest_block_number > 4,
781                "Found stale history shard with highest_block_number {} <= checkpoint 4",
782                key.sharded_key.highest_block_number
783            );
784        }
785
786        // Run prune again to complete - should finish processing block 5 and 6
787        let input2 = PruneInput {
788            previous_checkpoint: Some(checkpoint),
789            to_block: 10,
790            limiter: PruneLimiter::default().set_deleted_entries_limit(100), // high limit
791        };
792
793        let provider2 = db.factory.database_provider_rw().unwrap();
794        provider2.set_storage_settings_cache(StorageSettings::v1());
795        let result2 = segment.prune(&provider2, input2).unwrap();
796
797        assert!(result2.progress.is_finished(), "Second run should complete");
798
799        segment
800            .save_checkpoint(
801                &provider2,
802                result2.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
803            )
804            .unwrap();
805        provider2.commit().expect("commit");
806
807        // Verify final checkpoint
808        let final_checkpoint = db
809            .factory
810            .provider()
811            .unwrap()
812            .get_prune_checkpoint(PruneSegment::StorageHistory)
813            .unwrap()
814            .expect("checkpoint should exist");
815
816        // Should now be at block 6 (the last block with changesets)
817        assert_eq!(final_checkpoint.block_number, Some(6), "Final checkpoint should be at block 6");
818
819        // All changesets should be pruned
820        let final_changesets = db.table::<tables::StorageChangeSets>().unwrap();
821        assert!(final_changesets.is_empty(), "All changesets up to block 10 should be pruned");
822    }
823
824    #[cfg(all(unix, feature = "rocksdb"))]
825    #[test]
826    fn prune_rocksdb() {
827        use reth_db_api::models::storage_sharded_key::StorageShardedKey;
828        use reth_provider::RocksDBProviderFactory;
829        use reth_storage_api::StorageSettings;
830
831        let db = TestStageDB::default();
832        let mut rng = generators::rng();
833
834        let blocks = random_block_range(
835            &mut rng,
836            0..=100,
837            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
838        );
839        db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
840
841        let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
842
843        let (changesets, _) = random_changeset_range(
844            &mut rng,
845            blocks.iter(),
846            accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
847            1..2,
848            1..2,
849        );
850
851        db.insert_changesets_to_static_files(changesets.clone(), None)
852            .expect("insert changesets to static files");
853
854        let mut storage_indices: BTreeMap<(alloy_primitives::Address, B256), Vec<u64>> =
855            BTreeMap::new();
856        for (block, changeset) in changesets.iter().enumerate() {
857            for (address, _, storage_entries) in changeset {
858                for entry in storage_entries {
859                    storage_indices.entry((*address, entry.key)).or_default().push(block as u64);
860                }
861            }
862        }
863
864        {
865            let rocksdb = db.factory.rocksdb_provider();
866            let mut batch = rocksdb.batch();
867            for ((address, storage_key), block_numbers) in &storage_indices {
868                let shard = BlockNumberList::new_pre_sorted(block_numbers.clone());
869                batch
870                    .put::<tables::StoragesHistory>(
871                        StorageShardedKey::last(*address, *storage_key),
872                        &shard,
873                    )
874                    .expect("insert storage history shard");
875            }
876            batch.commit().expect("commit rocksdb batch");
877        }
878
879        {
880            let rocksdb = db.factory.rocksdb_provider();
881            for (address, storage_key) in storage_indices.keys() {
882                let shards = rocksdb.storage_history_shards(*address, *storage_key).unwrap();
883                assert!(!shards.is_empty(), "RocksDB should contain storage history before prune");
884            }
885        }
886
887        let to_block = 50u64;
888        let prune_mode = PruneMode::Before(to_block);
889        let input =
890            PruneInput { previous_checkpoint: None, to_block, limiter: PruneLimiter::default() };
891        let segment = StorageHistory::new(prune_mode);
892
893        let provider = db.factory.database_provider_rw().unwrap();
894        provider.set_storage_settings_cache(StorageSettings::v2());
895        let result = segment.prune(&provider, input).unwrap();
896        provider.commit().expect("commit");
897
898        assert_matches!(
899            result,
900            SegmentOutput { progress: PruneProgress::Finished, checkpoint: Some(_), .. }
901        );
902
903        {
904            let rocksdb = db.factory.rocksdb_provider();
905            for ((address, storage_key), block_numbers) in &storage_indices {
906                let shards = rocksdb.storage_history_shards(*address, *storage_key).unwrap();
907
908                let remaining_blocks: Vec<u64> =
909                    block_numbers.iter().copied().filter(|&b| b > to_block).collect();
910
911                if remaining_blocks.is_empty() {
912                    assert!(
913                        shards.is_empty(),
914                        "Shard for {:?}/{:?} should be deleted when all blocks pruned",
915                        address,
916                        storage_key
917                    );
918                } else {
919                    assert!(!shards.is_empty(), "Shard should exist with remaining blocks");
920                    let actual_blocks: Vec<u64> =
921                        shards.iter().flat_map(|(_, list)| list.iter()).collect();
922                    assert_eq!(
923                        actual_blocks, remaining_blocks,
924                        "RocksDB shard should only contain blocks > {}",
925                        to_block
926                    );
927                }
928            }
929        }
930    }
931}