Skip to main content

reth_prune/segments/user/
account_history.rs

1use crate::{
2    db_ext::DbTxPruneExt,
3    segments::{
4        user::history::{finalize_history_prune, HistoryPruneResult},
5        PruneInput, Segment,
6    },
7    PrunerError,
8};
9use alloy_primitives::BlockNumber;
10use reth_db_api::{models::ShardedKey, tables, transaction::DbTxMut};
11use reth_provider::{
12    changeset_walker::StaticFileAccountChangesetWalker, DBProvider, EitherWriter,
13    RocksDBProviderFactory, StaticFileProviderFactory,
14};
15use reth_prune_types::{
16    PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
17};
18use reth_static_file_types::StaticFileSegment;
19use reth_storage_api::{ChangeSetReader, StorageSettingsCache};
20use rustc_hash::FxHashMap;
21use tracing::{instrument, trace};
22
23/// Number of account history tables to prune in one step.
24///
25/// Account History consists of two tables: [`tables::AccountChangeSets`] (either in database or
26/// static files) and [`tables::AccountsHistory`]. We want to prune them to the same block number.
27const ACCOUNT_HISTORY_TABLES_TO_PRUNE: usize = 2;
28
29#[derive(Debug)]
30pub struct AccountHistory {
31    mode: PruneMode,
32}
33
34impl AccountHistory {
35    pub const fn new(mode: PruneMode) -> Self {
36        Self { mode }
37    }
38}
39
40impl<Provider> Segment<Provider> for AccountHistory
41where
42    Provider: DBProvider<Tx: DbTxMut>
43        + StaticFileProviderFactory
44        + StorageSettingsCache
45        + ChangeSetReader
46        + RocksDBProviderFactory,
47{
48    fn segment(&self) -> PruneSegment {
49        PruneSegment::AccountHistory
50    }
51
52    fn mode(&self) -> Option<PruneMode> {
53        Some(self.mode)
54    }
55
56    fn purpose(&self) -> PrunePurpose {
57        PrunePurpose::User
58    }
59
60    #[instrument(
61        name = "AccountHistory::prune",
62        target = "pruner",
63        skip(self, provider),
64        ret(level = "trace")
65    )]
66    fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
67        let range = match input.get_next_block_range() {
68            Some(range) => range,
69            None => {
70                trace!(target: "pruner", "No account history to prune");
71                return Ok(SegmentOutput::done())
72            }
73        };
74        let range_end = *range.end();
75
76        // Check where account history indices are stored
77        #[cfg(all(unix, feature = "rocksdb"))]
78        if provider.cached_storage_settings().storage_v2 {
79            return self.prune_rocksdb(provider, input, range, range_end);
80        }
81
82        // Check where account changesets are stored (MDBX path)
83        if EitherWriter::account_changesets_destination(provider).is_static_file() {
84            self.prune_static_files(provider, input, range, range_end)
85        } else {
86            self.prune_database(provider, input, range, range_end)
87        }
88    }
89}
90
91impl AccountHistory {
92    /// Prunes account history when changesets are stored in static files.
93    fn prune_static_files<Provider>(
94        &self,
95        provider: &Provider,
96        input: PruneInput,
97        range: std::ops::RangeInclusive<BlockNumber>,
98        range_end: BlockNumber,
99    ) -> Result<SegmentOutput, PrunerError>
100    where
101        Provider: DBProvider<Tx: DbTxMut> + StaticFileProviderFactory + ChangeSetReader,
102    {
103        let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() {
104            input.limiter.set_deleted_entries_limit(limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE)
105        } else {
106            input.limiter
107        };
108
109        // The limiter may already be exhausted from a previous segment in the same prune run.
110        // Early exit avoids unnecessary iteration when no budget remains.
111        if limiter.is_limit_reached() {
112            return Ok(SegmentOutput::not_done(
113                limiter.interrupt_reason(),
114                input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
115            ))
116        }
117
118        // Deleted account changeset keys (account addresses) with the highest block number deleted
119        // for that key.
120        //
121        // The size of this map is limited by `prune_delete_limit * blocks_since_last_run /
122        // ACCOUNT_HISTORY_TABLES_TO_PRUNE`, and with current default it's usually `3500 * 5
123        // / 2`, so 8750 entries. Each entry is `160 bit + 64 bit`, so the total
124        // size should be up to ~0.25MB + some hashmap overhead. `blocks_since_last_run` is
125        // additionally limited by the `max_reorg_depth`, so no OOM is expected here.
126        let mut highest_deleted_accounts = FxHashMap::default();
127        let mut last_changeset_pruned_block = None;
128        let mut pruned_changesets = 0;
129        let mut done = true;
130
131        let walker = StaticFileAccountChangesetWalker::new(provider, range);
132        for result in walker {
133            if limiter.is_limit_reached() {
134                done = false;
135                break;
136            }
137            let (block_number, changeset) = result?;
138            highest_deleted_accounts.insert(changeset.address, block_number);
139            last_changeset_pruned_block = Some(block_number);
140            pruned_changesets += 1;
141            limiter.increment_deleted_entries_count();
142        }
143
144        // Delete static file jars only when fully processed
145        if done && let Some(last_block) = last_changeset_pruned_block {
146            provider
147                .static_file_provider()
148                .delete_segment_below_block(StaticFileSegment::AccountChangeSets, last_block + 1)?;
149        }
150        trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned account history (changesets from static files)");
151
152        let result = HistoryPruneResult {
153            highest_deleted: highest_deleted_accounts,
154            last_pruned_block: last_changeset_pruned_block,
155            pruned_count: pruned_changesets,
156            done,
157        };
158        finalize_history_prune::<_, tables::AccountsHistory, _, _>(
159            provider,
160            result,
161            range_end,
162            &limiter,
163            ShardedKey::new,
164            |a, b| a.key == b.key,
165        )
166        .map_err(Into::into)
167    }
168
169    fn prune_database<Provider>(
170        &self,
171        provider: &Provider,
172        input: PruneInput,
173        range: std::ops::RangeInclusive<BlockNumber>,
174        range_end: BlockNumber,
175    ) -> Result<SegmentOutput, PrunerError>
176    where
177        Provider: DBProvider<Tx: DbTxMut>,
178    {
179        let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() {
180            input.limiter.set_deleted_entries_limit(limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE)
181        } else {
182            input.limiter
183        };
184
185        if limiter.is_limit_reached() {
186            return Ok(SegmentOutput::not_done(
187                limiter.interrupt_reason(),
188                input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
189            ))
190        }
191
192        // Deleted account changeset keys (account addresses) with the highest block number deleted
193        // for that key.
194        //
195        // The size of this map is limited by `prune_delete_limit * blocks_since_last_run /
196        // ACCOUNT_HISTORY_TABLES_TO_PRUNE`, and with the current defaults it's usually `3500 * 5 /
197        // 2`, so 8750 entries. Each entry is `160 bit + 64 bit`, so the total size should be up to
198        // ~0.25MB + some hashmap overhead. `blocks_since_last_run` is additionally limited by the
199        // `max_reorg_depth`, so no OOM is expected here.
200        let mut last_changeset_pruned_block = None;
201        let mut highest_deleted_accounts = FxHashMap::default();
202        let (pruned_changesets, done) =
203            provider.tx_ref().prune_table_with_range::<tables::AccountChangeSets>(
204                range,
205                &mut limiter,
206                |_| false,
207                |(block_number, account)| {
208                    highest_deleted_accounts.insert(account.address, block_number);
209                    last_changeset_pruned_block = Some(block_number);
210                },
211            )?;
212        trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned account history (changesets from database)");
213
214        let result = HistoryPruneResult {
215            highest_deleted: highest_deleted_accounts,
216            last_pruned_block: last_changeset_pruned_block,
217            pruned_count: pruned_changesets,
218            done,
219        };
220        finalize_history_prune::<_, tables::AccountsHistory, _, _>(
221            provider,
222            result,
223            range_end,
224            &limiter,
225            ShardedKey::new,
226            |a, b| a.key == b.key,
227        )
228        .map_err(Into::into)
229    }
230
231    /// Prunes account history when indices are stored in `RocksDB`.
232    ///
233    /// Reads account changesets from static files and prunes the corresponding
234    /// `RocksDB` history shards.
235    #[cfg(all(unix, feature = "rocksdb"))]
236    fn prune_rocksdb<Provider>(
237        &self,
238        provider: &Provider,
239        input: PruneInput,
240        range: std::ops::RangeInclusive<BlockNumber>,
241        range_end: BlockNumber,
242    ) -> Result<SegmentOutput, PrunerError>
243    where
244        Provider: DBProvider + StaticFileProviderFactory + ChangeSetReader + RocksDBProviderFactory,
245    {
246        // Unlike MDBX path, we don't divide the limit by 2 because RocksDB path only prunes
247        // history shards (no separate changeset table to delete from). The changesets are in
248        // static files which are deleted separately.
249        let mut limiter = input.limiter;
250
251        if limiter.is_limit_reached() {
252            return Ok(SegmentOutput::not_done(
253                limiter.interrupt_reason(),
254                input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
255            ))
256        }
257
258        let mut highest_deleted_accounts = FxHashMap::default();
259        let mut last_changeset_pruned_block = None;
260        let mut changesets_processed = 0usize;
261        let mut done = true;
262
263        // Walk account changesets from static files using a streaming iterator.
264        // For each changeset, track the highest block number seen for each address
265        // to determine which history shard entries need pruning.
266        let walker = StaticFileAccountChangesetWalker::new(provider, range);
267        for result in walker {
268            if limiter.is_limit_reached() {
269                done = false;
270                break;
271            }
272            let (block_number, changeset) = result?;
273            highest_deleted_accounts.insert(changeset.address, block_number);
274            last_changeset_pruned_block = Some(block_number);
275            changesets_processed += 1;
276            limiter.increment_deleted_entries_count();
277        }
278        trace!(target: "pruner", processed = %changesets_processed, %done, "Scanned account changesets from static files");
279
280        let last_changeset_pruned_block = last_changeset_pruned_block
281            .map(|block_number| if done { block_number } else { block_number.saturating_sub(1) })
282            .unwrap_or(range_end);
283
284        // Prune RocksDB history shards for affected accounts
285        let mut deleted_shards = 0usize;
286        let mut updated_shards = 0usize;
287
288        // Sort by address for better RocksDB cache locality
289        let mut sorted_accounts: Vec<_> = highest_deleted_accounts.into_iter().collect();
290        sorted_accounts.sort_unstable_by_key(|(addr, _)| *addr);
291
292        provider.with_rocksdb_batch(|mut batch| {
293            let targets: Vec<_> = sorted_accounts
294                .iter()
295                .map(|(addr, highest)| (*addr, (*highest).min(last_changeset_pruned_block)))
296                .collect();
297
298            let outcomes = batch.prune_account_history_batch(&targets)?;
299            deleted_shards = outcomes.deleted;
300            updated_shards = outcomes.updated;
301
302            Ok(((), Some(batch.into_inner())))
303        })?;
304        trace!(target: "pruner", deleted = deleted_shards, updated = updated_shards, %done, "Pruned account history (RocksDB indices)");
305
306        // Delete static file jars only when fully processed. During provider.commit(), RocksDB
307        // batch is committed before the MDBX checkpoint. If crash occurs after RocksDB commit
308        // but before MDBX commit, on restart the pruner checkpoint indicates data needs
309        // re-pruning, but the RocksDB shards are already pruned - this is safe because pruning
310        // is idempotent (re-pruning already-pruned shards is a no-op).
311        if done {
312            provider.static_file_provider().delete_segment_below_block(
313                StaticFileSegment::AccountChangeSets,
314                last_changeset_pruned_block + 1,
315            )?;
316        }
317
318        let progress = limiter.progress(done);
319
320        Ok(SegmentOutput {
321            progress,
322            pruned: changesets_processed + deleted_shards + updated_shards,
323            checkpoint: Some(SegmentOutputCheckpoint {
324                block_number: Some(last_changeset_pruned_block),
325                tx_number: None,
326            }),
327        })
328    }
329}
330
331#[cfg(test)]
332mod tests {
333    use super::ACCOUNT_HISTORY_TABLES_TO_PRUNE;
334    use crate::segments::{AccountHistory, PruneInput, PruneLimiter, Segment, SegmentOutput};
335    use alloy_primitives::{BlockNumber, B256};
336    use assert_matches::assert_matches;
337    use reth_db_api::{models::StorageSettings, tables, BlockNumberList};
338    use reth_provider::{DBProvider, DatabaseProviderFactory, PruneCheckpointReader};
339    use reth_prune_types::{
340        PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
341    };
342    use reth_stages::test_utils::{StorageKind, TestStageDB};
343    use reth_storage_api::StorageSettingsCache;
344    use reth_testing_utils::generators::{
345        self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
346    };
347    use std::{collections::BTreeMap, ops::AddAssign};
348
349    #[test]
350    fn prune_legacy() {
351        let db = TestStageDB::default();
352        let mut rng = generators::rng();
353
354        let blocks = random_block_range(
355            &mut rng,
356            0..=5000,
357            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
358        );
359        db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
360
361        let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
362
363        let (changesets, _) = random_changeset_range(
364            &mut rng,
365            blocks.iter(),
366            accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
367            0..0,
368            0..0,
369        );
370        db.insert_changesets(changesets.clone(), None).expect("insert changesets");
371        db.insert_history(changesets.clone(), None).expect("insert history");
372
373        let account_occurrences = db.table::<tables::AccountsHistory>().unwrap().into_iter().fold(
374            BTreeMap::<_, usize>::new(),
375            |mut map, (key, _)| {
376                map.entry(key.key).or_default().add_assign(1);
377                map
378            },
379        );
380        assert!(account_occurrences.into_iter().any(|(_, occurrences)| occurrences > 1));
381
382        assert_eq!(
383            db.table::<tables::AccountChangeSets>().unwrap().len(),
384            changesets.iter().flatten().count()
385        );
386
387        let original_shards = db.table::<tables::AccountsHistory>().unwrap();
388
389        let test_prune =
390            |to_block: BlockNumber, run: usize, expected_result: (PruneProgress, usize)| {
391                let prune_mode = PruneMode::Before(to_block);
392                let deleted_entries_limit = 2000;
393                let mut limiter =
394                    PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
395                let input = PruneInput {
396                    previous_checkpoint: db
397                        .factory
398                        .provider()
399                        .unwrap()
400                        .get_prune_checkpoint(PruneSegment::AccountHistory)
401                        .unwrap(),
402                    to_block,
403                    limiter: limiter.clone(),
404                };
405                let segment = AccountHistory::new(prune_mode);
406
407                let provider = db.factory.database_provider_rw().unwrap();
408                provider.set_storage_settings_cache(StorageSettings::v1());
409                let result = segment.prune(&provider, input).unwrap();
410                limiter.increment_deleted_entries_count_by(result.pruned);
411
412                assert_matches!(
413                    result,
414                    SegmentOutput {progress, pruned, checkpoint: Some(_)}
415                        if (progress, pruned) == expected_result
416                );
417
418                segment
419                    .save_checkpoint(
420                        &provider,
421                        result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
422                    )
423                    .unwrap();
424                provider.commit().expect("commit");
425
426                let changesets = changesets
427                    .iter()
428                    .enumerate()
429                    .flat_map(|(block_number, changeset)| {
430                        changeset.iter().map(move |change| (block_number, change))
431                    })
432                    .collect::<Vec<_>>();
433
434                #[expect(clippy::skip_while_next)]
435                let pruned = changesets
436                    .iter()
437                    .enumerate()
438                    .skip_while(|(i, (block_number, _))| {
439                        *i < deleted_entries_limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE * run &&
440                            *block_number <= to_block as usize
441                    })
442                    .next()
443                    .map(|(i, _)| i)
444                    .unwrap_or_default();
445
446                // Skip what we've pruned so far, subtracting one to get last pruned block number
447                // further down
448                let mut pruned_changesets = changesets.iter().skip(pruned.saturating_sub(1));
449
450                let last_pruned_block_number = pruned_changesets
451                    .next()
452                    .map(|(block_number, _)| if result.progress.is_finished() {
453                        *block_number
454                    } else {
455                        block_number.saturating_sub(1)
456                    } as BlockNumber)
457                    .unwrap_or(to_block);
458
459                let pruned_changesets = pruned_changesets.fold(
460                    BTreeMap::<_, Vec<_>>::new(),
461                    |mut acc, (block_number, change)| {
462                        acc.entry(block_number).or_default().push(change);
463                        acc
464                    },
465                );
466
467                assert_eq!(
468                    db.table::<tables::AccountChangeSets>().unwrap().len(),
469                    pruned_changesets.values().flatten().count()
470                );
471
472                let actual_shards = db.table::<tables::AccountsHistory>().unwrap();
473
474                let expected_shards = original_shards
475                    .iter()
476                    .filter(|(key, _)| key.highest_block_number > last_pruned_block_number)
477                    .map(|(key, blocks)| {
478                        let new_blocks =
479                            blocks.iter().skip_while(|block| *block <= last_pruned_block_number);
480                        (key.clone(), BlockNumberList::new_pre_sorted(new_blocks))
481                    })
482                    .collect::<Vec<_>>();
483
484                assert_eq!(actual_shards, expected_shards);
485
486                assert_eq!(
487                    db.factory
488                        .provider()
489                        .unwrap()
490                        .get_prune_checkpoint(PruneSegment::AccountHistory)
491                        .unwrap(),
492                    Some(PruneCheckpoint {
493                        block_number: Some(last_pruned_block_number),
494                        tx_number: None,
495                        prune_mode
496                    })
497                );
498            };
499
500        test_prune(
501            998,
502            1,
503            (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 1000),
504        );
505        test_prune(998, 2, (PruneProgress::Finished, 998));
506        test_prune(1400, 3, (PruneProgress::Finished, 804));
507    }
508
509    /// Tests the `prune_static_files` code path. On unix with rocksdb feature, v2 storage
510    /// routes to `prune_rocksdb` instead, so this test only runs without rocksdb (the
511    /// `prune_rocksdb_path` test covers that configuration).
512    #[test]
513    #[cfg(not(all(unix, feature = "rocksdb")))]
514    fn prune_static_file() {
515        let db = TestStageDB::default();
516        let mut rng = generators::rng();
517
518        let blocks = random_block_range(
519            &mut rng,
520            0..=5000,
521            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
522        );
523        db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
524
525        let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
526
527        let (changesets, _) = random_changeset_range(
528            &mut rng,
529            blocks.iter(),
530            accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
531            0..0,
532            0..0,
533        );
534
535        db.insert_changesets_to_static_files(changesets.clone(), None)
536            .expect("insert changesets to static files");
537        db.insert_history(changesets.clone(), None).expect("insert history");
538
539        let account_occurrences = db.table::<tables::AccountsHistory>().unwrap().into_iter().fold(
540            BTreeMap::<_, usize>::new(),
541            |mut map, (key, _)| {
542                map.entry(key.key).or_default().add_assign(1);
543                map
544            },
545        );
546        assert!(account_occurrences.into_iter().any(|(_, occurrences)| occurrences > 1));
547
548        let original_shards = db.table::<tables::AccountsHistory>().unwrap();
549
550        let test_prune =
551            |to_block: BlockNumber, run: usize, expected_result: (PruneProgress, usize)| {
552                let prune_mode = PruneMode::Before(to_block);
553                let deleted_entries_limit = 2000;
554                let mut limiter =
555                    PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
556                let input = PruneInput {
557                    previous_checkpoint: db
558                        .factory
559                        .provider()
560                        .unwrap()
561                        .get_prune_checkpoint(PruneSegment::AccountHistory)
562                        .unwrap(),
563                    to_block,
564                    limiter: limiter.clone(),
565                };
566                let segment = AccountHistory::new(prune_mode);
567
568                let provider = db.factory.database_provider_rw().unwrap();
569                provider.set_storage_settings_cache(StorageSettings::v2());
570                let result = segment.prune(&provider, input).unwrap();
571                limiter.increment_deleted_entries_count_by(result.pruned);
572
573                assert_matches!(
574                    result,
575                    SegmentOutput {progress, pruned, checkpoint: Some(_)}
576                        if (progress, pruned) == expected_result
577                );
578
579                segment
580                    .save_checkpoint(
581                        &provider,
582                        result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
583                    )
584                    .unwrap();
585                provider.commit().expect("commit");
586
587                let changesets = changesets
588                    .iter()
589                    .enumerate()
590                    .flat_map(|(block_number, changeset)| {
591                        changeset.iter().map(move |change| (block_number, change))
592                    })
593                    .collect::<Vec<_>>();
594
595                #[expect(clippy::skip_while_next)]
596                let pruned = changesets
597                    .iter()
598                    .enumerate()
599                    .skip_while(|(i, (block_number, _))| {
600                        *i < deleted_entries_limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE * run &&
601                            *block_number <= to_block as usize
602                    })
603                    .next()
604                    .map(|(i, _)| i)
605                    .unwrap_or_default();
606
607                // Skip what we've pruned so far, subtracting one to get last pruned block number
608                // further down
609                let mut pruned_changesets = changesets.iter().skip(pruned.saturating_sub(1));
610
611                let last_pruned_block_number = pruned_changesets
612                    .next()
613                    .map(|(block_number, _)| {
614                        (if result.progress.is_finished() {
615                            *block_number
616                        } else {
617                            block_number.saturating_sub(1)
618                        }) as BlockNumber
619                    })
620                    .unwrap_or(to_block);
621
622                let actual_shards = db.table::<tables::AccountsHistory>().unwrap();
623
624                let expected_shards = original_shards
625                    .iter()
626                    .filter(|(key, _)| key.highest_block_number > last_pruned_block_number)
627                    .map(|(key, blocks)| {
628                        let new_blocks =
629                            blocks.iter().skip_while(|block| *block <= last_pruned_block_number);
630                        (key.clone(), BlockNumberList::new_pre_sorted(new_blocks))
631                    })
632                    .collect::<Vec<_>>();
633
634                assert_eq!(actual_shards, expected_shards);
635
636                assert_eq!(
637                    db.factory
638                        .provider()
639                        .unwrap()
640                        .get_prune_checkpoint(PruneSegment::AccountHistory)
641                        .unwrap(),
642                    Some(PruneCheckpoint {
643                        block_number: Some(last_pruned_block_number),
644                        tx_number: None,
645                        prune_mode
646                    })
647                );
648            };
649
650        test_prune(
651            998,
652            1,
653            (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 1000),
654        );
655        test_prune(998, 2, (PruneProgress::Finished, 1000));
656        test_prune(1400, 3, (PruneProgress::Finished, 804));
657    }
658
659    #[cfg(all(unix, feature = "rocksdb"))]
660    #[test]
661    fn prune_rocksdb_path() {
662        use reth_db_api::models::ShardedKey;
663        use reth_provider::{RocksDBProviderFactory, StaticFileProviderFactory};
664
665        let db = TestStageDB::default();
666        let mut rng = generators::rng();
667
668        let blocks = random_block_range(
669            &mut rng,
670            0..=100,
671            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
672        );
673        db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
674
675        let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
676
677        let (changesets, _) = random_changeset_range(
678            &mut rng,
679            blocks.iter(),
680            accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
681            0..0,
682            0..0,
683        );
684
685        db.insert_changesets_to_static_files(changesets.clone(), None)
686            .expect("insert changesets to static files");
687
688        let mut account_blocks: BTreeMap<_, Vec<u64>> = BTreeMap::new();
689        for (block, changeset) in changesets.iter().enumerate() {
690            for (address, _, _) in changeset {
691                account_blocks.entry(*address).or_default().push(block as u64);
692            }
693        }
694
695        let rocksdb = db.factory.rocksdb_provider();
696        let mut batch = rocksdb.batch();
697        for (address, block_numbers) in &account_blocks {
698            let shard = BlockNumberList::new_pre_sorted(block_numbers.iter().copied());
699            batch
700                .put::<tables::AccountsHistory>(ShardedKey::new(*address, u64::MAX), &shard)
701                .unwrap();
702        }
703        batch.commit().unwrap();
704
705        for (address, expected_blocks) in &account_blocks {
706            let shards = rocksdb.account_history_shards(*address).unwrap();
707            assert_eq!(shards.len(), 1);
708            assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), *expected_blocks);
709        }
710
711        let to_block: BlockNumber = 50;
712        let prune_mode = PruneMode::Before(to_block);
713        let input =
714            PruneInput { previous_checkpoint: None, to_block, limiter: PruneLimiter::default() };
715        let segment = AccountHistory::new(prune_mode);
716
717        db.factory.set_storage_settings_cache(StorageSettings::v2());
718
719        let provider = db.factory.database_provider_rw().unwrap();
720        let result = segment.prune(&provider, input).unwrap();
721        provider.commit().expect("commit");
722
723        assert_matches!(
724            result,
725            SegmentOutput { progress: PruneProgress::Finished, pruned, checkpoint: Some(_) }
726                if pruned > 0
727        );
728
729        for (address, original_blocks) in &account_blocks {
730            let shards = rocksdb.account_history_shards(*address).unwrap();
731
732            let expected_blocks: Vec<u64> =
733                original_blocks.iter().copied().filter(|b| *b > to_block).collect();
734
735            if expected_blocks.is_empty() {
736                assert!(
737                    shards.is_empty(),
738                    "Expected no shards for address {address:?} after pruning"
739                );
740            } else {
741                assert_eq!(shards.len(), 1, "Expected 1 shard for address {address:?}");
742                assert_eq!(
743                    shards[0].1.iter().collect::<Vec<_>>(),
744                    expected_blocks,
745                    "Shard blocks mismatch for address {address:?}"
746                );
747            }
748        }
749
750        let static_file_provider = db.factory.static_file_provider();
751        let highest_block = static_file_provider.get_highest_static_file_block(
752            reth_static_file_types::StaticFileSegment::AccountChangeSets,
753        );
754        if let Some(block) = highest_block {
755            assert!(
756                block > to_block,
757                "Static files should only contain blocks above to_block ({to_block}), got {block}"
758            );
759        }
760    }
761
762    /// Tests that when a limiter stops mid-block (with multiple changes for the same block),
763    /// the checkpoint is set to `block_number - 1` to avoid dangling index entries.
764    #[test]
765    fn prune_partial_progress_mid_block() {
766        use alloy_primitives::{Address, U256};
767        use reth_primitives_traits::Account;
768        use reth_testing_utils::generators::ChangeSet;
769
770        let db = TestStageDB::default();
771        let mut rng = generators::rng();
772
773        // Create blocks 0..=10
774        let blocks = random_block_range(
775            &mut rng,
776            0..=10,
777            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
778        );
779        db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
780
781        // Create specific changesets where block 5 has 4 account changes
782        let addr1 = Address::with_last_byte(1);
783        let addr2 = Address::with_last_byte(2);
784        let addr3 = Address::with_last_byte(3);
785        let addr4 = Address::with_last_byte(4);
786        let addr5 = Address::with_last_byte(5);
787
788        let account = Account { nonce: 1, balance: U256::from(100), bytecode_hash: None };
789
790        // Build changesets: blocks 0-4 have 1 change each, block 5 has 4 changes, block 6 has 1
791        let changesets: Vec<ChangeSet> = vec![
792            vec![(addr1, account, vec![])], // block 0
793            vec![(addr1, account, vec![])], // block 1
794            vec![(addr1, account, vec![])], // block 2
795            vec![(addr1, account, vec![])], // block 3
796            vec![(addr1, account, vec![])], // block 4
797            // block 5: 4 different account changes (sorted by address for consistency)
798            vec![
799                (addr1, account, vec![]),
800                (addr2, account, vec![]),
801                (addr3, account, vec![]),
802                (addr4, account, vec![]),
803            ],
804            vec![(addr5, account, vec![])], // block 6
805        ];
806
807        db.insert_changesets(changesets.clone(), None).expect("insert changesets");
808        db.insert_history(changesets.clone(), None).expect("insert history");
809
810        // Total changesets: 5 (blocks 0-4) + 4 (block 5) + 1 (block 6) = 10
811        assert_eq!(
812            db.table::<tables::AccountChangeSets>().unwrap().len(),
813            changesets.iter().flatten().count()
814        );
815
816        let prune_mode = PruneMode::Before(10);
817
818        // Set limiter to stop after 7 entries (mid-block 5: 5 from blocks 0-4, then 2 of 4 from
819        // block 5). Due to ACCOUNT_HISTORY_TABLES_TO_PRUNE=2, actual limit is 7/2=3
820        // changesets. So we'll process blocks 0, 1, 2 (3 changesets), stopping before block
821        // 3. Actually, let's use a higher limit to reach block 5. With limit=14, we get 7
822        // changeset slots. Blocks 0-4 use 5 slots, leaving 2 for block 5 (which has 4), so
823        // we stop mid-block 5.
824        let deleted_entries_limit = 14; // 14/2 = 7 changeset entries before limit
825        let limiter = PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
826
827        let input = PruneInput { previous_checkpoint: None, to_block: 10, limiter };
828        let segment = AccountHistory::new(prune_mode);
829
830        let provider = db.factory.database_provider_rw().unwrap();
831        provider.set_storage_settings_cache(StorageSettings::v1());
832        let result = segment.prune(&provider, input).unwrap();
833
834        // Should report that there's more data
835        assert!(!result.progress.is_finished(), "Expected HasMoreData since we stopped mid-block");
836
837        // Save checkpoint and commit
838        segment
839            .save_checkpoint(&provider, result.checkpoint.unwrap().as_prune_checkpoint(prune_mode))
840            .unwrap();
841        provider.commit().expect("commit");
842
843        // Verify checkpoint is set to block 4 (not 5), since block 5 is incomplete
844        let checkpoint = db
845            .factory
846            .provider()
847            .unwrap()
848            .get_prune_checkpoint(PruneSegment::AccountHistory)
849            .unwrap()
850            .expect("checkpoint should exist");
851
852        assert_eq!(
853            checkpoint.block_number,
854            Some(4),
855            "Checkpoint should be block 4 (block before incomplete block 5)"
856        );
857
858        // Verify remaining changesets (block 5 and 6 should still have entries)
859        let remaining_changesets = db.table::<tables::AccountChangeSets>().unwrap();
860        // After pruning blocks 0-4, remaining should be block 5 (4 entries) + block 6 (1 entry) = 5
861        // But since we stopped mid-block 5, some of block 5 might be pruned
862        // However, checkpoint is 4, so on re-run we should re-process from block 5
863        assert!(
864            !remaining_changesets.is_empty(),
865            "Should have remaining changesets for blocks 5-6"
866        );
867
868        // Verify no dangling history indices for blocks that weren't fully pruned
869        // The indices for block 5 should still reference blocks <= 5 appropriately
870        let history = db.table::<tables::AccountsHistory>().unwrap();
871        for (key, _blocks) in &history {
872            // All blocks in the history should be > checkpoint block number
873            // OR the shard's highest_block_number should be > checkpoint
874            assert!(
875                key.highest_block_number > 4,
876                "Found stale history shard with highest_block_number {} <= checkpoint 4",
877                key.highest_block_number
878            );
879        }
880
881        // Run prune again to complete - should finish processing block 5 and 6
882        let input2 = PruneInput {
883            previous_checkpoint: Some(checkpoint),
884            to_block: 10,
885            limiter: PruneLimiter::default().set_deleted_entries_limit(100), // high limit
886        };
887
888        let provider2 = db.factory.database_provider_rw().unwrap();
889        provider2.set_storage_settings_cache(StorageSettings::v1());
890        let result2 = segment.prune(&provider2, input2).unwrap();
891
892        assert!(result2.progress.is_finished(), "Second run should complete");
893
894        segment
895            .save_checkpoint(
896                &provider2,
897                result2.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
898            )
899            .unwrap();
900        provider2.commit().expect("commit");
901
902        // Verify final checkpoint
903        let final_checkpoint = db
904            .factory
905            .provider()
906            .unwrap()
907            .get_prune_checkpoint(PruneSegment::AccountHistory)
908            .unwrap()
909            .expect("checkpoint should exist");
910
911        // Should now be at block 6 (the last block with changesets)
912        assert_eq!(final_checkpoint.block_number, Some(6), "Final checkpoint should be at block 6");
913
914        // All changesets should be pruned
915        let final_changesets = db.table::<tables::AccountChangeSets>().unwrap();
916        assert!(final_changesets.is_empty(), "All changesets up to block 10 should be pruned");
917    }
918}