Skip to main content

reth_prune/segments/user/
account_history.rs

1use crate::{
2    db_ext::DbTxPruneExt,
3    segments::{
4        user::history::{finalize_history_prune, HistoryPruneResult},
5        PruneInput, Segment,
6    },
7    PrunerError,
8};
9use alloy_primitives::BlockNumber;
10use reth_db_api::{models::ShardedKey, tables, transaction::DbTxMut};
11use reth_provider::{
12    changeset_walker::StaticFileAccountChangesetWalker, DBProvider, EitherWriter,
13    RocksDBProviderFactory, StaticFileProviderFactory,
14};
15use reth_prune_types::{
16    PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
17};
18use reth_static_file_types::StaticFileSegment;
19use reth_storage_api::{ChangeSetReader, StorageSettingsCache};
20use rustc_hash::FxHashMap;
21use tracing::{instrument, trace};
22
23/// Number of account history tables to prune in one step.
24///
25/// Account History consists of two tables: [`tables::AccountChangeSets`] (either in database or
26/// static files) and [`tables::AccountsHistory`]. We want to prune them to the same block number.
27const ACCOUNT_HISTORY_TABLES_TO_PRUNE: usize = 2;
28
29#[derive(Debug)]
30pub struct AccountHistory {
31    mode: PruneMode,
32}
33
34impl AccountHistory {
35    pub const fn new(mode: PruneMode) -> Self {
36        Self { mode }
37    }
38}
39
40impl<Provider> Segment<Provider> for AccountHistory
41where
42    Provider: DBProvider<Tx: DbTxMut>
43        + StaticFileProviderFactory
44        + StorageSettingsCache
45        + ChangeSetReader
46        + RocksDBProviderFactory,
47{
48    fn segment(&self) -> PruneSegment {
49        PruneSegment::AccountHistory
50    }
51
52    fn mode(&self) -> Option<PruneMode> {
53        Some(self.mode)
54    }
55
56    fn purpose(&self) -> PrunePurpose {
57        PrunePurpose::User
58    }
59
60    #[instrument(
61        name = "AccountHistory::prune",
62        target = "pruner",
63        skip(self, provider),
64        ret(level = "trace")
65    )]
66    fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
67        let range = match input.get_next_block_range() {
68            Some(range) => range,
69            None => {
70                trace!(target: "pruner", "No account history to prune");
71                return Ok(SegmentOutput::done())
72            }
73        };
74        let range_end = *range.end();
75
76        // Check where account history indices are stored
77        if provider.cached_storage_settings().storage_v2 {
78            return self.prune_rocksdb(provider, input, range, range_end);
79        }
80
81        // Check where account changesets are stored (MDBX path)
82        if EitherWriter::account_changesets_destination(provider).is_static_file() {
83            self.prune_static_files(provider, input, range, range_end)
84        } else {
85            self.prune_database(provider, input, range, range_end)
86        }
87    }
88}
89
90impl AccountHistory {
91    /// Prunes account history when changesets are stored in static files.
92    fn prune_static_files<Provider>(
93        &self,
94        provider: &Provider,
95        input: PruneInput,
96        range: std::ops::RangeInclusive<BlockNumber>,
97        range_end: BlockNumber,
98    ) -> Result<SegmentOutput, PrunerError>
99    where
100        Provider: DBProvider<Tx: DbTxMut> + StaticFileProviderFactory + ChangeSetReader,
101    {
102        let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() {
103            input.limiter.set_deleted_entries_limit(limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE)
104        } else {
105            input.limiter
106        };
107
108        // The limiter may already be exhausted from a previous segment in the same prune run.
109        // Early exit avoids unnecessary iteration when no budget remains.
110        if limiter.is_limit_reached() {
111            return Ok(SegmentOutput::not_done(
112                limiter.interrupt_reason(),
113                input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
114            ))
115        }
116
117        // Deleted account changeset keys (account addresses) with the highest block number deleted
118        // for that key.
119        //
120        // The size of this map is limited by `prune_delete_limit * blocks_since_last_run /
121        // ACCOUNT_HISTORY_TABLES_TO_PRUNE`, and with current default it's usually `3500 * 5
122        // / 2`, so 8750 entries. Each entry is `160 bit + 64 bit`, so the total
123        // size should be up to ~0.25MB + some hashmap overhead. `blocks_since_last_run` is
124        // additionally limited by the `max_reorg_depth`, so no OOM is expected here.
125        let mut highest_deleted_accounts = FxHashMap::default();
126        let mut last_changeset_pruned_block = None;
127        let mut pruned_changesets = 0;
128        let mut done = true;
129
130        let walker = StaticFileAccountChangesetWalker::new(provider, range);
131        for result in walker {
132            if limiter.is_limit_reached() {
133                done = false;
134                break;
135            }
136            let (block_number, changeset) = result?;
137            highest_deleted_accounts.insert(changeset.address, block_number);
138            last_changeset_pruned_block = Some(block_number);
139            pruned_changesets += 1;
140            limiter.increment_deleted_entries_count();
141        }
142
143        // Delete static file jars only when fully processed
144        if done && let Some(last_block) = last_changeset_pruned_block {
145            provider
146                .static_file_provider()
147                .delete_segment_below_block(StaticFileSegment::AccountChangeSets, last_block + 1)?;
148        }
149        trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned account history (changesets from static files)");
150
151        let result = HistoryPruneResult {
152            highest_deleted: highest_deleted_accounts,
153            last_pruned_block: last_changeset_pruned_block,
154            pruned_count: pruned_changesets,
155            done,
156        };
157        finalize_history_prune::<_, tables::AccountsHistory, _, _>(
158            provider,
159            result,
160            range_end,
161            &limiter,
162            ShardedKey::new,
163            |a, b| a.key == b.key,
164        )
165        .map_err(Into::into)
166    }
167
168    fn prune_database<Provider>(
169        &self,
170        provider: &Provider,
171        input: PruneInput,
172        range: std::ops::RangeInclusive<BlockNumber>,
173        range_end: BlockNumber,
174    ) -> Result<SegmentOutput, PrunerError>
175    where
176        Provider: DBProvider<Tx: DbTxMut>,
177    {
178        let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() {
179            input.limiter.set_deleted_entries_limit(limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE)
180        } else {
181            input.limiter
182        };
183
184        if limiter.is_limit_reached() {
185            return Ok(SegmentOutput::not_done(
186                limiter.interrupt_reason(),
187                input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
188            ))
189        }
190
191        // Deleted account changeset keys (account addresses) with the highest block number deleted
192        // for that key.
193        //
194        // The size of this map is limited by `prune_delete_limit * blocks_since_last_run /
195        // ACCOUNT_HISTORY_TABLES_TO_PRUNE`, and with the current defaults it's usually `3500 * 5 /
196        // 2`, so 8750 entries. Each entry is `160 bit + 64 bit`, so the total size should be up to
197        // ~0.25MB + some hashmap overhead. `blocks_since_last_run` is additionally limited by the
198        // `max_reorg_depth`, so no OOM is expected here.
199        let mut last_changeset_pruned_block = None;
200        let mut highest_deleted_accounts = FxHashMap::default();
201        let (pruned_changesets, done) =
202            provider.tx_ref().prune_table_with_range::<tables::AccountChangeSets>(
203                range,
204                &mut limiter,
205                |_| false,
206                |(block_number, account)| {
207                    highest_deleted_accounts.insert(account.address, block_number);
208                    last_changeset_pruned_block = Some(block_number);
209                },
210            )?;
211        trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned account history (changesets from database)");
212
213        let result = HistoryPruneResult {
214            highest_deleted: highest_deleted_accounts,
215            last_pruned_block: last_changeset_pruned_block,
216            pruned_count: pruned_changesets,
217            done,
218        };
219        finalize_history_prune::<_, tables::AccountsHistory, _, _>(
220            provider,
221            result,
222            range_end,
223            &limiter,
224            ShardedKey::new,
225            |a, b| a.key == b.key,
226        )
227        .map_err(Into::into)
228    }
229
230    /// Prunes account history when indices are stored in `RocksDB`.
231    ///
232    /// Reads account changesets from static files and prunes the corresponding
233    /// `RocksDB` history shards.
234    fn prune_rocksdb<Provider>(
235        &self,
236        provider: &Provider,
237        input: PruneInput,
238        range: std::ops::RangeInclusive<BlockNumber>,
239        range_end: BlockNumber,
240    ) -> Result<SegmentOutput, PrunerError>
241    where
242        Provider: DBProvider + StaticFileProviderFactory + ChangeSetReader + RocksDBProviderFactory,
243    {
244        // Unlike MDBX path, we don't divide the limit by 2 because RocksDB path only prunes
245        // history shards (no separate changeset table to delete from). The changesets are in
246        // static files which are deleted separately.
247        let mut limiter = input.limiter;
248
249        if limiter.is_limit_reached() {
250            return Ok(SegmentOutput::not_done(
251                limiter.interrupt_reason(),
252                input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
253            ))
254        }
255
256        let mut highest_deleted_accounts = FxHashMap::default();
257        let mut last_changeset_pruned_block = None;
258        let mut changesets_processed = 0usize;
259        let mut done = true;
260
261        // Walk account changesets from static files using a streaming iterator.
262        // For each changeset, track the highest block number seen for each address
263        // to determine which history shard entries need pruning.
264        let walker = StaticFileAccountChangesetWalker::new(provider, range);
265        for result in walker {
266            if limiter.is_limit_reached() {
267                done = false;
268                break;
269            }
270            let (block_number, changeset) = result?;
271            highest_deleted_accounts.insert(changeset.address, block_number);
272            last_changeset_pruned_block = Some(block_number);
273            changesets_processed += 1;
274            limiter.increment_deleted_entries_count();
275        }
276        trace!(target: "pruner", processed = %changesets_processed, %done, "Scanned account changesets from static files");
277
278        let last_changeset_pruned_block = last_changeset_pruned_block
279            .map(|block_number| if done { block_number } else { block_number.saturating_sub(1) })
280            .unwrap_or(range_end);
281
282        // Prune RocksDB history shards for affected accounts
283        let mut deleted_shards = 0usize;
284        let mut updated_shards = 0usize;
285
286        // Sort by address for better RocksDB cache locality
287        let mut sorted_accounts: Vec<_> = highest_deleted_accounts.into_iter().collect();
288        sorted_accounts.sort_unstable_by_key(|(addr, _)| *addr);
289
290        provider.with_rocksdb_batch(|mut batch| {
291            let targets: Vec<_> = sorted_accounts
292                .iter()
293                .map(|(addr, highest)| (*addr, (*highest).min(last_changeset_pruned_block)))
294                .collect();
295
296            let outcomes = batch.prune_account_history_batch(&targets)?;
297            deleted_shards = outcomes.deleted;
298            updated_shards = outcomes.updated;
299
300            Ok(((), Some(batch.into_inner())))
301        })?;
302        trace!(target: "pruner", deleted = deleted_shards, updated = updated_shards, %done, "Pruned account history (RocksDB indices)");
303
304        // Delete static file jars only when fully processed. During provider.commit(), RocksDB
305        // batch is committed before the MDBX checkpoint. If crash occurs after RocksDB commit
306        // but before MDBX commit, on restart the pruner checkpoint indicates data needs
307        // re-pruning, but the RocksDB shards are already pruned - this is safe because pruning
308        // is idempotent (re-pruning already-pruned shards is a no-op).
309        if done {
310            provider.static_file_provider().delete_segment_below_block(
311                StaticFileSegment::AccountChangeSets,
312                last_changeset_pruned_block + 1,
313            )?;
314        }
315
316        let progress = limiter.progress(done);
317
318        Ok(SegmentOutput {
319            progress,
320            pruned: changesets_processed + deleted_shards + updated_shards,
321            checkpoint: Some(SegmentOutputCheckpoint {
322                block_number: Some(last_changeset_pruned_block),
323                tx_number: None,
324            }),
325        })
326    }
327}
328
329#[cfg(test)]
330mod tests {
331    use super::ACCOUNT_HISTORY_TABLES_TO_PRUNE;
332    use crate::segments::{AccountHistory, PruneInput, PruneLimiter, Segment, SegmentOutput};
333    use alloy_primitives::{BlockNumber, B256};
334    use assert_matches::assert_matches;
335    use reth_db_api::{models::StorageSettings, tables, BlockNumberList};
336    use reth_provider::{DBProvider, DatabaseProviderFactory, PruneCheckpointReader};
337    use reth_prune_types::{
338        PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
339    };
340    use reth_stages::test_utils::{StorageKind, TestStageDB};
341    use reth_storage_api::StorageSettingsCache;
342    use reth_testing_utils::generators::{
343        self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
344    };
345    use std::{collections::BTreeMap, ops::AddAssign};
346
347    #[test]
348    fn prune_legacy() {
349        let db = TestStageDB::default();
350        let mut rng = generators::rng();
351
352        let blocks = random_block_range(
353            &mut rng,
354            0..=5000,
355            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
356        );
357        db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
358
359        let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
360
361        let (changesets, _) = random_changeset_range(
362            &mut rng,
363            blocks.iter(),
364            accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
365            0..0,
366            0..0,
367        );
368        db.insert_changesets(changesets.clone(), None).expect("insert changesets");
369        db.insert_history(changesets.clone(), None).expect("insert history");
370
371        let account_occurrences = db.table::<tables::AccountsHistory>().unwrap().into_iter().fold(
372            BTreeMap::<_, usize>::new(),
373            |mut map, (key, _)| {
374                map.entry(key.key).or_default().add_assign(1);
375                map
376            },
377        );
378        assert!(account_occurrences.into_iter().any(|(_, occurrences)| occurrences > 1));
379
380        assert_eq!(
381            db.table::<tables::AccountChangeSets>().unwrap().len(),
382            changesets.iter().flatten().count()
383        );
384
385        let original_shards = db.table::<tables::AccountsHistory>().unwrap();
386
387        let test_prune =
388            |to_block: BlockNumber, run: usize, expected_result: (PruneProgress, usize)| {
389                let prune_mode = PruneMode::Before(to_block);
390                let deleted_entries_limit = 2000;
391                let mut limiter =
392                    PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
393                let input = PruneInput {
394                    previous_checkpoint: db
395                        .factory
396                        .provider()
397                        .unwrap()
398                        .get_prune_checkpoint(PruneSegment::AccountHistory)
399                        .unwrap(),
400                    to_block,
401                    limiter: limiter.clone(),
402                };
403                let segment = AccountHistory::new(prune_mode);
404
405                let provider = db.factory.database_provider_rw().unwrap();
406                provider.set_storage_settings_cache(StorageSettings::v1());
407                let result = segment.prune(&provider, input).unwrap();
408                limiter.increment_deleted_entries_count_by(result.pruned);
409
410                assert_matches!(
411                    result,
412                    SegmentOutput {progress, pruned, checkpoint: Some(_)}
413                        if (progress, pruned) == expected_result
414                );
415
416                segment
417                    .save_checkpoint(
418                        &provider,
419                        result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
420                    )
421                    .unwrap();
422                provider.commit().expect("commit");
423
424                let changesets = changesets
425                    .iter()
426                    .enumerate()
427                    .flat_map(|(block_number, changeset)| {
428                        changeset.iter().map(move |change| (block_number, change))
429                    })
430                    .collect::<Vec<_>>();
431
432                #[expect(clippy::skip_while_next)]
433                let pruned = changesets
434                    .iter()
435                    .enumerate()
436                    .skip_while(|(i, (block_number, _))| {
437                        *i < deleted_entries_limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE * run &&
438                            *block_number <= to_block as usize
439                    })
440                    .next()
441                    .map(|(i, _)| i)
442                    .unwrap_or_default();
443
444                // Skip what we've pruned so far, subtracting one to get last pruned block number
445                // further down
446                let mut pruned_changesets = changesets.iter().skip(pruned.saturating_sub(1));
447
448                let last_pruned_block_number = pruned_changesets
449                    .next()
450                    .map(|(block_number, _)| if result.progress.is_finished() {
451                        *block_number
452                    } else {
453                        block_number.saturating_sub(1)
454                    } as BlockNumber)
455                    .unwrap_or(to_block);
456
457                let pruned_changesets = pruned_changesets.fold(
458                    BTreeMap::<_, Vec<_>>::new(),
459                    |mut acc, (block_number, change)| {
460                        acc.entry(block_number).or_default().push(change);
461                        acc
462                    },
463                );
464
465                assert_eq!(
466                    db.table::<tables::AccountChangeSets>().unwrap().len(),
467                    pruned_changesets.values().flatten().count()
468                );
469
470                let actual_shards = db.table::<tables::AccountsHistory>().unwrap();
471
472                let expected_shards = original_shards
473                    .iter()
474                    .filter(|(key, _)| key.highest_block_number > last_pruned_block_number)
475                    .map(|(key, blocks)| {
476                        let new_blocks =
477                            blocks.iter().skip_while(|block| *block <= last_pruned_block_number);
478                        (key.clone(), BlockNumberList::new_pre_sorted(new_blocks))
479                    })
480                    .collect::<Vec<_>>();
481
482                assert_eq!(actual_shards, expected_shards);
483
484                assert_eq!(
485                    db.factory
486                        .provider()
487                        .unwrap()
488                        .get_prune_checkpoint(PruneSegment::AccountHistory)
489                        .unwrap(),
490                    Some(PruneCheckpoint {
491                        block_number: Some(last_pruned_block_number),
492                        tx_number: None,
493                        prune_mode
494                    })
495                );
496            };
497
498        test_prune(
499            998,
500            1,
501            (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 1000),
502        );
503        test_prune(998, 2, (PruneProgress::Finished, 998));
504        test_prune(1400, 3, (PruneProgress::Finished, 804));
505    }
506
507    #[test]
508    fn prune_rocksdb_path() {
509        use reth_db_api::models::ShardedKey;
510        use reth_provider::{RocksDBProviderFactory, StaticFileProviderFactory};
511
512        let db = TestStageDB::default();
513        let mut rng = generators::rng();
514
515        let blocks = random_block_range(
516            &mut rng,
517            0..=100,
518            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
519        );
520        db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
521
522        let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
523
524        let (changesets, _) = random_changeset_range(
525            &mut rng,
526            blocks.iter(),
527            accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
528            0..0,
529            0..0,
530        );
531
532        db.insert_changesets_to_static_files(changesets.clone(), None)
533            .expect("insert changesets to static files");
534
535        let mut account_blocks: BTreeMap<_, Vec<u64>> = BTreeMap::new();
536        for (block, changeset) in changesets.iter().enumerate() {
537            for (address, _, _) in changeset {
538                account_blocks.entry(*address).or_default().push(block as u64);
539            }
540        }
541
542        let rocksdb = db.factory.rocksdb_provider();
543        let mut batch = rocksdb.batch();
544        for (address, block_numbers) in &account_blocks {
545            let shard = BlockNumberList::new_pre_sorted(block_numbers.iter().copied());
546            batch
547                .put::<tables::AccountsHistory>(ShardedKey::new(*address, u64::MAX), &shard)
548                .unwrap();
549        }
550        batch.commit().unwrap();
551
552        for (address, expected_blocks) in &account_blocks {
553            let shards = rocksdb.account_history_shards(*address).unwrap();
554            assert_eq!(shards.len(), 1);
555            assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), *expected_blocks);
556        }
557
558        let to_block: BlockNumber = 50;
559        let prune_mode = PruneMode::Before(to_block);
560        let input =
561            PruneInput { previous_checkpoint: None, to_block, limiter: PruneLimiter::default() };
562        let segment = AccountHistory::new(prune_mode);
563
564        db.factory.set_storage_settings_cache(StorageSettings::v2());
565
566        let provider = db.factory.database_provider_rw().unwrap();
567        let result = segment.prune(&provider, input).unwrap();
568        provider.commit().expect("commit");
569
570        assert_matches!(
571            result,
572            SegmentOutput { progress: PruneProgress::Finished, pruned, checkpoint: Some(_) }
573                if pruned > 0
574        );
575
576        for (address, original_blocks) in &account_blocks {
577            let shards = rocksdb.account_history_shards(*address).unwrap();
578
579            let expected_blocks: Vec<u64> =
580                original_blocks.iter().copied().filter(|b| *b > to_block).collect();
581
582            if expected_blocks.is_empty() {
583                assert!(
584                    shards.is_empty(),
585                    "Expected no shards for address {address:?} after pruning"
586                );
587            } else {
588                assert_eq!(shards.len(), 1, "Expected 1 shard for address {address:?}");
589                assert_eq!(
590                    shards[0].1.iter().collect::<Vec<_>>(),
591                    expected_blocks,
592                    "Shard blocks mismatch for address {address:?}"
593                );
594            }
595        }
596
597        let static_file_provider = db.factory.static_file_provider();
598        let highest_block = static_file_provider.get_highest_static_file_block(
599            reth_static_file_types::StaticFileSegment::AccountChangeSets,
600        );
601        if let Some(block) = highest_block {
602            assert!(
603                block > to_block,
604                "Static files should only contain blocks above to_block ({to_block}), got {block}"
605            );
606        }
607    }
608
609    /// Tests that when a limiter stops mid-block (with multiple changes for the same block),
610    /// the checkpoint is set to `block_number - 1` to avoid dangling index entries.
611    #[test]
612    fn prune_partial_progress_mid_block() {
613        use alloy_primitives::{Address, U256};
614        use reth_primitives_traits::Account;
615        use reth_testing_utils::generators::ChangeSet;
616
617        let db = TestStageDB::default();
618        let mut rng = generators::rng();
619
620        // Create blocks 0..=10
621        let blocks = random_block_range(
622            &mut rng,
623            0..=10,
624            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
625        );
626        db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
627
628        // Create specific changesets where block 5 has 4 account changes
629        let addr1 = Address::with_last_byte(1);
630        let addr2 = Address::with_last_byte(2);
631        let addr3 = Address::with_last_byte(3);
632        let addr4 = Address::with_last_byte(4);
633        let addr5 = Address::with_last_byte(5);
634
635        let account = Account { nonce: 1, balance: U256::from(100), bytecode_hash: None };
636
637        // Build changesets: blocks 0-4 have 1 change each, block 5 has 4 changes, block 6 has 1
638        let changesets: Vec<ChangeSet> = vec![
639            vec![(addr1, account, vec![])], // block 0
640            vec![(addr1, account, vec![])], // block 1
641            vec![(addr1, account, vec![])], // block 2
642            vec![(addr1, account, vec![])], // block 3
643            vec![(addr1, account, vec![])], // block 4
644            // block 5: 4 different account changes (sorted by address for consistency)
645            vec![
646                (addr1, account, vec![]),
647                (addr2, account, vec![]),
648                (addr3, account, vec![]),
649                (addr4, account, vec![]),
650            ],
651            vec![(addr5, account, vec![])], // block 6
652        ];
653
654        db.insert_changesets(changesets.clone(), None).expect("insert changesets");
655        db.insert_history(changesets.clone(), None).expect("insert history");
656
657        // Total changesets: 5 (blocks 0-4) + 4 (block 5) + 1 (block 6) = 10
658        assert_eq!(
659            db.table::<tables::AccountChangeSets>().unwrap().len(),
660            changesets.iter().flatten().count()
661        );
662
663        let prune_mode = PruneMode::Before(10);
664
665        // Set limiter to stop after 7 entries (mid-block 5: 5 from blocks 0-4, then 2 of 4 from
666        // block 5). Due to ACCOUNT_HISTORY_TABLES_TO_PRUNE=2, actual limit is 7/2=3
667        // changesets. So we'll process blocks 0, 1, 2 (3 changesets), stopping before block
668        // 3. Actually, let's use a higher limit to reach block 5. With limit=14, we get 7
669        // changeset slots. Blocks 0-4 use 5 slots, leaving 2 for block 5 (which has 4), so
670        // we stop mid-block 5.
671        let deleted_entries_limit = 14; // 14/2 = 7 changeset entries before limit
672        let limiter = PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
673
674        let input = PruneInput { previous_checkpoint: None, to_block: 10, limiter };
675        let segment = AccountHistory::new(prune_mode);
676
677        let provider = db.factory.database_provider_rw().unwrap();
678        provider.set_storage_settings_cache(StorageSettings::v1());
679        let result = segment.prune(&provider, input).unwrap();
680
681        // Should report that there's more data
682        assert!(!result.progress.is_finished(), "Expected HasMoreData since we stopped mid-block");
683
684        // Save checkpoint and commit
685        segment
686            .save_checkpoint(&provider, result.checkpoint.unwrap().as_prune_checkpoint(prune_mode))
687            .unwrap();
688        provider.commit().expect("commit");
689
690        // Verify checkpoint is set to block 4 (not 5), since block 5 is incomplete
691        let checkpoint = db
692            .factory
693            .provider()
694            .unwrap()
695            .get_prune_checkpoint(PruneSegment::AccountHistory)
696            .unwrap()
697            .expect("checkpoint should exist");
698
699        assert_eq!(
700            checkpoint.block_number,
701            Some(4),
702            "Checkpoint should be block 4 (block before incomplete block 5)"
703        );
704
705        // Verify remaining changesets (block 5 and 6 should still have entries)
706        let remaining_changesets = db.table::<tables::AccountChangeSets>().unwrap();
707        // After pruning blocks 0-4, remaining should be block 5 (4 entries) + block 6 (1 entry) = 5
708        // But since we stopped mid-block 5, some of block 5 might be pruned
709        // However, checkpoint is 4, so on re-run we should re-process from block 5
710        assert!(
711            !remaining_changesets.is_empty(),
712            "Should have remaining changesets for blocks 5-6"
713        );
714
715        // Verify no dangling history indices for blocks that weren't fully pruned
716        // The indices for block 5 should still reference blocks <= 5 appropriately
717        let history = db.table::<tables::AccountsHistory>().unwrap();
718        for (key, _blocks) in &history {
719            // All blocks in the history should be > checkpoint block number
720            // OR the shard's highest_block_number should be > checkpoint
721            assert!(
722                key.highest_block_number > 4,
723                "Found stale history shard with highest_block_number {} <= checkpoint 4",
724                key.highest_block_number
725            );
726        }
727
728        // Run prune again to complete - should finish processing block 5 and 6
729        let input2 = PruneInput {
730            previous_checkpoint: Some(checkpoint),
731            to_block: 10,
732            limiter: PruneLimiter::default().set_deleted_entries_limit(100), // high limit
733        };
734
735        let provider2 = db.factory.database_provider_rw().unwrap();
736        provider2.set_storage_settings_cache(StorageSettings::v1());
737        let result2 = segment.prune(&provider2, input2).unwrap();
738
739        assert!(result2.progress.is_finished(), "Second run should complete");
740
741        segment
742            .save_checkpoint(
743                &provider2,
744                result2.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
745            )
746            .unwrap();
747        provider2.commit().expect("commit");
748
749        // Verify final checkpoint
750        let final_checkpoint = db
751            .factory
752            .provider()
753            .unwrap()
754            .get_prune_checkpoint(PruneSegment::AccountHistory)
755            .unwrap()
756            .expect("checkpoint should exist");
757
758        // Should now be at block 6 (the last block with changesets)
759        assert_eq!(final_checkpoint.block_number, Some(6), "Final checkpoint should be at block 6");
760
761        // All changesets should be pruned
762        let final_changesets = db.table::<tables::AccountChangeSets>().unwrap();
763        assert!(final_changesets.is_empty(), "All changesets up to block 10 should be pruned");
764    }
765}