1use crate::{
2 db_ext::DbTxPruneExt,
3 segments::{
4 user::history::{finalize_history_prune, HistoryPruneResult},
5 PruneInput, Segment,
6 },
7 PrunerError,
8};
9use alloy_primitives::BlockNumber;
10use reth_db_api::{models::ShardedKey, tables, transaction::DbTxMut};
11use reth_provider::{
12 changeset_walker::StaticFileAccountChangesetWalker, DBProvider, EitherWriter,
13 RocksDBProviderFactory, StaticFileProviderFactory,
14};
15use reth_prune_types::{
16 PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
17};
18use reth_static_file_types::StaticFileSegment;
19use reth_storage_api::{ChangeSetReader, StorageSettingsCache};
20use rustc_hash::FxHashMap;
21use tracing::{instrument, trace};
22
23const ACCOUNT_HISTORY_TABLES_TO_PRUNE: usize = 2;
28
29#[derive(Debug)]
30pub struct AccountHistory {
31 mode: PruneMode,
32}
33
34impl AccountHistory {
35 pub const fn new(mode: PruneMode) -> Self {
36 Self { mode }
37 }
38}
39
40impl<Provider> Segment<Provider> for AccountHistory
41where
42 Provider: DBProvider<Tx: DbTxMut>
43 + StaticFileProviderFactory
44 + StorageSettingsCache
45 + ChangeSetReader
46 + RocksDBProviderFactory,
47{
48 fn segment(&self) -> PruneSegment {
49 PruneSegment::AccountHistory
50 }
51
52 fn mode(&self) -> Option<PruneMode> {
53 Some(self.mode)
54 }
55
56 fn purpose(&self) -> PrunePurpose {
57 PrunePurpose::User
58 }
59
60 #[instrument(
61 name = "AccountHistory::prune",
62 target = "pruner",
63 skip(self, provider),
64 ret(level = "trace")
65 )]
66 fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
67 let range = match input.get_next_block_range() {
68 Some(range) => range,
69 None => {
70 trace!(target: "pruner", "No account history to prune");
71 return Ok(SegmentOutput::done())
72 }
73 };
74 let range_end = *range.end();
75
76 #[cfg(all(unix, feature = "rocksdb"))]
78 if provider.cached_storage_settings().storage_v2 {
79 return self.prune_rocksdb(provider, input, range, range_end);
80 }
81
82 if EitherWriter::account_changesets_destination(provider).is_static_file() {
84 self.prune_static_files(provider, input, range, range_end)
85 } else {
86 self.prune_database(provider, input, range, range_end)
87 }
88 }
89}
90
91impl AccountHistory {
92 fn prune_static_files<Provider>(
94 &self,
95 provider: &Provider,
96 input: PruneInput,
97 range: std::ops::RangeInclusive<BlockNumber>,
98 range_end: BlockNumber,
99 ) -> Result<SegmentOutput, PrunerError>
100 where
101 Provider: DBProvider<Tx: DbTxMut> + StaticFileProviderFactory + ChangeSetReader,
102 {
103 let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() {
104 input.limiter.set_deleted_entries_limit(limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE)
105 } else {
106 input.limiter
107 };
108
109 if limiter.is_limit_reached() {
112 return Ok(SegmentOutput::not_done(
113 limiter.interrupt_reason(),
114 input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
115 ))
116 }
117
118 let mut highest_deleted_accounts = FxHashMap::default();
127 let mut last_changeset_pruned_block = None;
128 let mut pruned_changesets = 0;
129 let mut done = true;
130
131 let walker = StaticFileAccountChangesetWalker::new(provider, range);
132 for result in walker {
133 if limiter.is_limit_reached() {
134 done = false;
135 break;
136 }
137 let (block_number, changeset) = result?;
138 highest_deleted_accounts.insert(changeset.address, block_number);
139 last_changeset_pruned_block = Some(block_number);
140 pruned_changesets += 1;
141 limiter.increment_deleted_entries_count();
142 }
143
144 if done && let Some(last_block) = last_changeset_pruned_block {
146 provider
147 .static_file_provider()
148 .delete_segment_below_block(StaticFileSegment::AccountChangeSets, last_block + 1)?;
149 }
150 trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned account history (changesets from static files)");
151
152 let result = HistoryPruneResult {
153 highest_deleted: highest_deleted_accounts,
154 last_pruned_block: last_changeset_pruned_block,
155 pruned_count: pruned_changesets,
156 done,
157 };
158 finalize_history_prune::<_, tables::AccountsHistory, _, _>(
159 provider,
160 result,
161 range_end,
162 &limiter,
163 ShardedKey::new,
164 |a, b| a.key == b.key,
165 )
166 .map_err(Into::into)
167 }
168
169 fn prune_database<Provider>(
170 &self,
171 provider: &Provider,
172 input: PruneInput,
173 range: std::ops::RangeInclusive<BlockNumber>,
174 range_end: BlockNumber,
175 ) -> Result<SegmentOutput, PrunerError>
176 where
177 Provider: DBProvider<Tx: DbTxMut>,
178 {
179 let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() {
180 input.limiter.set_deleted_entries_limit(limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE)
181 } else {
182 input.limiter
183 };
184
185 if limiter.is_limit_reached() {
186 return Ok(SegmentOutput::not_done(
187 limiter.interrupt_reason(),
188 input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
189 ))
190 }
191
192 let mut last_changeset_pruned_block = None;
201 let mut highest_deleted_accounts = FxHashMap::default();
202 let (pruned_changesets, done) =
203 provider.tx_ref().prune_table_with_range::<tables::AccountChangeSets>(
204 range,
205 &mut limiter,
206 |_| false,
207 |(block_number, account)| {
208 highest_deleted_accounts.insert(account.address, block_number);
209 last_changeset_pruned_block = Some(block_number);
210 },
211 )?;
212 trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned account history (changesets from database)");
213
214 let result = HistoryPruneResult {
215 highest_deleted: highest_deleted_accounts,
216 last_pruned_block: last_changeset_pruned_block,
217 pruned_count: pruned_changesets,
218 done,
219 };
220 finalize_history_prune::<_, tables::AccountsHistory, _, _>(
221 provider,
222 result,
223 range_end,
224 &limiter,
225 ShardedKey::new,
226 |a, b| a.key == b.key,
227 )
228 .map_err(Into::into)
229 }
230
231 #[cfg(all(unix, feature = "rocksdb"))]
236 fn prune_rocksdb<Provider>(
237 &self,
238 provider: &Provider,
239 input: PruneInput,
240 range: std::ops::RangeInclusive<BlockNumber>,
241 range_end: BlockNumber,
242 ) -> Result<SegmentOutput, PrunerError>
243 where
244 Provider: DBProvider + StaticFileProviderFactory + ChangeSetReader + RocksDBProviderFactory,
245 {
246 let mut limiter = input.limiter;
250
251 if limiter.is_limit_reached() {
252 return Ok(SegmentOutput::not_done(
253 limiter.interrupt_reason(),
254 input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
255 ))
256 }
257
258 let mut highest_deleted_accounts = FxHashMap::default();
259 let mut last_changeset_pruned_block = None;
260 let mut changesets_processed = 0usize;
261 let mut done = true;
262
263 let walker = StaticFileAccountChangesetWalker::new(provider, range);
267 for result in walker {
268 if limiter.is_limit_reached() {
269 done = false;
270 break;
271 }
272 let (block_number, changeset) = result?;
273 highest_deleted_accounts.insert(changeset.address, block_number);
274 last_changeset_pruned_block = Some(block_number);
275 changesets_processed += 1;
276 limiter.increment_deleted_entries_count();
277 }
278 trace!(target: "pruner", processed = %changesets_processed, %done, "Scanned account changesets from static files");
279
280 let last_changeset_pruned_block = last_changeset_pruned_block
281 .map(|block_number| if done { block_number } else { block_number.saturating_sub(1) })
282 .unwrap_or(range_end);
283
284 let mut deleted_shards = 0usize;
286 let mut updated_shards = 0usize;
287
288 let mut sorted_accounts: Vec<_> = highest_deleted_accounts.into_iter().collect();
290 sorted_accounts.sort_unstable_by_key(|(addr, _)| *addr);
291
292 provider.with_rocksdb_batch(|mut batch| {
293 let targets: Vec<_> = sorted_accounts
294 .iter()
295 .map(|(addr, highest)| (*addr, (*highest).min(last_changeset_pruned_block)))
296 .collect();
297
298 let outcomes = batch.prune_account_history_batch(&targets)?;
299 deleted_shards = outcomes.deleted;
300 updated_shards = outcomes.updated;
301
302 Ok(((), Some(batch.into_inner())))
303 })?;
304 trace!(target: "pruner", deleted = deleted_shards, updated = updated_shards, %done, "Pruned account history (RocksDB indices)");
305
306 if done {
312 provider.static_file_provider().delete_segment_below_block(
313 StaticFileSegment::AccountChangeSets,
314 last_changeset_pruned_block + 1,
315 )?;
316 }
317
318 let progress = limiter.progress(done);
319
320 Ok(SegmentOutput {
321 progress,
322 pruned: changesets_processed + deleted_shards + updated_shards,
323 checkpoint: Some(SegmentOutputCheckpoint {
324 block_number: Some(last_changeset_pruned_block),
325 tx_number: None,
326 }),
327 })
328 }
329}
330
331#[cfg(test)]
332mod tests {
333 use super::ACCOUNT_HISTORY_TABLES_TO_PRUNE;
334 use crate::segments::{AccountHistory, PruneInput, PruneLimiter, Segment, SegmentOutput};
335 use alloy_primitives::{BlockNumber, B256};
336 use assert_matches::assert_matches;
337 use reth_db_api::{models::StorageSettings, tables, BlockNumberList};
338 use reth_provider::{DBProvider, DatabaseProviderFactory, PruneCheckpointReader};
339 use reth_prune_types::{
340 PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
341 };
342 use reth_stages::test_utils::{StorageKind, TestStageDB};
343 use reth_storage_api::StorageSettingsCache;
344 use reth_testing_utils::generators::{
345 self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
346 };
347 use std::{collections::BTreeMap, ops::AddAssign};
348
349 #[test]
350 fn prune_legacy() {
351 let db = TestStageDB::default();
352 let mut rng = generators::rng();
353
354 let blocks = random_block_range(
355 &mut rng,
356 0..=5000,
357 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
358 );
359 db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
360
361 let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
362
363 let (changesets, _) = random_changeset_range(
364 &mut rng,
365 blocks.iter(),
366 accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
367 0..0,
368 0..0,
369 );
370 db.insert_changesets(changesets.clone(), None).expect("insert changesets");
371 db.insert_history(changesets.clone(), None).expect("insert history");
372
373 let account_occurrences = db.table::<tables::AccountsHistory>().unwrap().into_iter().fold(
374 BTreeMap::<_, usize>::new(),
375 |mut map, (key, _)| {
376 map.entry(key.key).or_default().add_assign(1);
377 map
378 },
379 );
380 assert!(account_occurrences.into_iter().any(|(_, occurrences)| occurrences > 1));
381
382 assert_eq!(
383 db.table::<tables::AccountChangeSets>().unwrap().len(),
384 changesets.iter().flatten().count()
385 );
386
387 let original_shards = db.table::<tables::AccountsHistory>().unwrap();
388
389 let test_prune =
390 |to_block: BlockNumber, run: usize, expected_result: (PruneProgress, usize)| {
391 let prune_mode = PruneMode::Before(to_block);
392 let deleted_entries_limit = 2000;
393 let mut limiter =
394 PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
395 let input = PruneInput {
396 previous_checkpoint: db
397 .factory
398 .provider()
399 .unwrap()
400 .get_prune_checkpoint(PruneSegment::AccountHistory)
401 .unwrap(),
402 to_block,
403 limiter: limiter.clone(),
404 };
405 let segment = AccountHistory::new(prune_mode);
406
407 let provider = db.factory.database_provider_rw().unwrap();
408 provider.set_storage_settings_cache(StorageSettings::v1());
409 let result = segment.prune(&provider, input).unwrap();
410 limiter.increment_deleted_entries_count_by(result.pruned);
411
412 assert_matches!(
413 result,
414 SegmentOutput {progress, pruned, checkpoint: Some(_)}
415 if (progress, pruned) == expected_result
416 );
417
418 segment
419 .save_checkpoint(
420 &provider,
421 result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
422 )
423 .unwrap();
424 provider.commit().expect("commit");
425
426 let changesets = changesets
427 .iter()
428 .enumerate()
429 .flat_map(|(block_number, changeset)| {
430 changeset.iter().map(move |change| (block_number, change))
431 })
432 .collect::<Vec<_>>();
433
434 #[expect(clippy::skip_while_next)]
435 let pruned = changesets
436 .iter()
437 .enumerate()
438 .skip_while(|(i, (block_number, _))| {
439 *i < deleted_entries_limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE * run &&
440 *block_number <= to_block as usize
441 })
442 .next()
443 .map(|(i, _)| i)
444 .unwrap_or_default();
445
446 let mut pruned_changesets = changesets.iter().skip(pruned.saturating_sub(1));
449
450 let last_pruned_block_number = pruned_changesets
451 .next()
452 .map(|(block_number, _)| if result.progress.is_finished() {
453 *block_number
454 } else {
455 block_number.saturating_sub(1)
456 } as BlockNumber)
457 .unwrap_or(to_block);
458
459 let pruned_changesets = pruned_changesets.fold(
460 BTreeMap::<_, Vec<_>>::new(),
461 |mut acc, (block_number, change)| {
462 acc.entry(block_number).or_default().push(change);
463 acc
464 },
465 );
466
467 assert_eq!(
468 db.table::<tables::AccountChangeSets>().unwrap().len(),
469 pruned_changesets.values().flatten().count()
470 );
471
472 let actual_shards = db.table::<tables::AccountsHistory>().unwrap();
473
474 let expected_shards = original_shards
475 .iter()
476 .filter(|(key, _)| key.highest_block_number > last_pruned_block_number)
477 .map(|(key, blocks)| {
478 let new_blocks =
479 blocks.iter().skip_while(|block| *block <= last_pruned_block_number);
480 (key.clone(), BlockNumberList::new_pre_sorted(new_blocks))
481 })
482 .collect::<Vec<_>>();
483
484 assert_eq!(actual_shards, expected_shards);
485
486 assert_eq!(
487 db.factory
488 .provider()
489 .unwrap()
490 .get_prune_checkpoint(PruneSegment::AccountHistory)
491 .unwrap(),
492 Some(PruneCheckpoint {
493 block_number: Some(last_pruned_block_number),
494 tx_number: None,
495 prune_mode
496 })
497 );
498 };
499
500 test_prune(
501 998,
502 1,
503 (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 1000),
504 );
505 test_prune(998, 2, (PruneProgress::Finished, 998));
506 test_prune(1400, 3, (PruneProgress::Finished, 804));
507 }
508
509 #[test]
513 #[cfg(not(all(unix, feature = "rocksdb")))]
514 fn prune_static_file() {
515 let db = TestStageDB::default();
516 let mut rng = generators::rng();
517
518 let blocks = random_block_range(
519 &mut rng,
520 0..=5000,
521 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
522 );
523 db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
524
525 let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
526
527 let (changesets, _) = random_changeset_range(
528 &mut rng,
529 blocks.iter(),
530 accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
531 0..0,
532 0..0,
533 );
534
535 db.insert_changesets_to_static_files(changesets.clone(), None)
536 .expect("insert changesets to static files");
537 db.insert_history(changesets.clone(), None).expect("insert history");
538
539 let account_occurrences = db.table::<tables::AccountsHistory>().unwrap().into_iter().fold(
540 BTreeMap::<_, usize>::new(),
541 |mut map, (key, _)| {
542 map.entry(key.key).or_default().add_assign(1);
543 map
544 },
545 );
546 assert!(account_occurrences.into_iter().any(|(_, occurrences)| occurrences > 1));
547
548 let original_shards = db.table::<tables::AccountsHistory>().unwrap();
549
550 let test_prune =
551 |to_block: BlockNumber, run: usize, expected_result: (PruneProgress, usize)| {
552 let prune_mode = PruneMode::Before(to_block);
553 let deleted_entries_limit = 2000;
554 let mut limiter =
555 PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
556 let input = PruneInput {
557 previous_checkpoint: db
558 .factory
559 .provider()
560 .unwrap()
561 .get_prune_checkpoint(PruneSegment::AccountHistory)
562 .unwrap(),
563 to_block,
564 limiter: limiter.clone(),
565 };
566 let segment = AccountHistory::new(prune_mode);
567
568 let provider = db.factory.database_provider_rw().unwrap();
569 provider.set_storage_settings_cache(StorageSettings::v2());
570 let result = segment.prune(&provider, input).unwrap();
571 limiter.increment_deleted_entries_count_by(result.pruned);
572
573 assert_matches!(
574 result,
575 SegmentOutput {progress, pruned, checkpoint: Some(_)}
576 if (progress, pruned) == expected_result
577 );
578
579 segment
580 .save_checkpoint(
581 &provider,
582 result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
583 )
584 .unwrap();
585 provider.commit().expect("commit");
586
587 let changesets = changesets
588 .iter()
589 .enumerate()
590 .flat_map(|(block_number, changeset)| {
591 changeset.iter().map(move |change| (block_number, change))
592 })
593 .collect::<Vec<_>>();
594
595 #[expect(clippy::skip_while_next)]
596 let pruned = changesets
597 .iter()
598 .enumerate()
599 .skip_while(|(i, (block_number, _))| {
600 *i < deleted_entries_limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE * run &&
601 *block_number <= to_block as usize
602 })
603 .next()
604 .map(|(i, _)| i)
605 .unwrap_or_default();
606
607 let mut pruned_changesets = changesets.iter().skip(pruned.saturating_sub(1));
610
611 let last_pruned_block_number = pruned_changesets
612 .next()
613 .map(|(block_number, _)| {
614 (if result.progress.is_finished() {
615 *block_number
616 } else {
617 block_number.saturating_sub(1)
618 }) as BlockNumber
619 })
620 .unwrap_or(to_block);
621
622 let actual_shards = db.table::<tables::AccountsHistory>().unwrap();
623
624 let expected_shards = original_shards
625 .iter()
626 .filter(|(key, _)| key.highest_block_number > last_pruned_block_number)
627 .map(|(key, blocks)| {
628 let new_blocks =
629 blocks.iter().skip_while(|block| *block <= last_pruned_block_number);
630 (key.clone(), BlockNumberList::new_pre_sorted(new_blocks))
631 })
632 .collect::<Vec<_>>();
633
634 assert_eq!(actual_shards, expected_shards);
635
636 assert_eq!(
637 db.factory
638 .provider()
639 .unwrap()
640 .get_prune_checkpoint(PruneSegment::AccountHistory)
641 .unwrap(),
642 Some(PruneCheckpoint {
643 block_number: Some(last_pruned_block_number),
644 tx_number: None,
645 prune_mode
646 })
647 );
648 };
649
650 test_prune(
651 998,
652 1,
653 (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 1000),
654 );
655 test_prune(998, 2, (PruneProgress::Finished, 1000));
656 test_prune(1400, 3, (PruneProgress::Finished, 804));
657 }
658
659 #[cfg(all(unix, feature = "rocksdb"))]
660 #[test]
661 fn prune_rocksdb_path() {
662 use reth_db_api::models::ShardedKey;
663 use reth_provider::{RocksDBProviderFactory, StaticFileProviderFactory};
664
665 let db = TestStageDB::default();
666 let mut rng = generators::rng();
667
668 let blocks = random_block_range(
669 &mut rng,
670 0..=100,
671 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
672 );
673 db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
674
675 let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
676
677 let (changesets, _) = random_changeset_range(
678 &mut rng,
679 blocks.iter(),
680 accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
681 0..0,
682 0..0,
683 );
684
685 db.insert_changesets_to_static_files(changesets.clone(), None)
686 .expect("insert changesets to static files");
687
688 let mut account_blocks: BTreeMap<_, Vec<u64>> = BTreeMap::new();
689 for (block, changeset) in changesets.iter().enumerate() {
690 for (address, _, _) in changeset {
691 account_blocks.entry(*address).or_default().push(block as u64);
692 }
693 }
694
695 let rocksdb = db.factory.rocksdb_provider();
696 let mut batch = rocksdb.batch();
697 for (address, block_numbers) in &account_blocks {
698 let shard = BlockNumberList::new_pre_sorted(block_numbers.iter().copied());
699 batch
700 .put::<tables::AccountsHistory>(ShardedKey::new(*address, u64::MAX), &shard)
701 .unwrap();
702 }
703 batch.commit().unwrap();
704
705 for (address, expected_blocks) in &account_blocks {
706 let shards = rocksdb.account_history_shards(*address).unwrap();
707 assert_eq!(shards.len(), 1);
708 assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), *expected_blocks);
709 }
710
711 let to_block: BlockNumber = 50;
712 let prune_mode = PruneMode::Before(to_block);
713 let input =
714 PruneInput { previous_checkpoint: None, to_block, limiter: PruneLimiter::default() };
715 let segment = AccountHistory::new(prune_mode);
716
717 db.factory.set_storage_settings_cache(StorageSettings::v2());
718
719 let provider = db.factory.database_provider_rw().unwrap();
720 let result = segment.prune(&provider, input).unwrap();
721 provider.commit().expect("commit");
722
723 assert_matches!(
724 result,
725 SegmentOutput { progress: PruneProgress::Finished, pruned, checkpoint: Some(_) }
726 if pruned > 0
727 );
728
729 for (address, original_blocks) in &account_blocks {
730 let shards = rocksdb.account_history_shards(*address).unwrap();
731
732 let expected_blocks: Vec<u64> =
733 original_blocks.iter().copied().filter(|b| *b > to_block).collect();
734
735 if expected_blocks.is_empty() {
736 assert!(
737 shards.is_empty(),
738 "Expected no shards for address {address:?} after pruning"
739 );
740 } else {
741 assert_eq!(shards.len(), 1, "Expected 1 shard for address {address:?}");
742 assert_eq!(
743 shards[0].1.iter().collect::<Vec<_>>(),
744 expected_blocks,
745 "Shard blocks mismatch for address {address:?}"
746 );
747 }
748 }
749
750 let static_file_provider = db.factory.static_file_provider();
751 let highest_block = static_file_provider.get_highest_static_file_block(
752 reth_static_file_types::StaticFileSegment::AccountChangeSets,
753 );
754 if let Some(block) = highest_block {
755 assert!(
756 block > to_block,
757 "Static files should only contain blocks above to_block ({to_block}), got {block}"
758 );
759 }
760 }
761
762 #[test]
765 fn prune_partial_progress_mid_block() {
766 use alloy_primitives::{Address, U256};
767 use reth_primitives_traits::Account;
768 use reth_testing_utils::generators::ChangeSet;
769
770 let db = TestStageDB::default();
771 let mut rng = generators::rng();
772
773 let blocks = random_block_range(
775 &mut rng,
776 0..=10,
777 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
778 );
779 db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
780
781 let addr1 = Address::with_last_byte(1);
783 let addr2 = Address::with_last_byte(2);
784 let addr3 = Address::with_last_byte(3);
785 let addr4 = Address::with_last_byte(4);
786 let addr5 = Address::with_last_byte(5);
787
788 let account = Account { nonce: 1, balance: U256::from(100), bytecode_hash: None };
789
790 let changesets: Vec<ChangeSet> = vec![
792 vec![(addr1, account, vec![])], vec![(addr1, account, vec![])], vec![(addr1, account, vec![])], vec![(addr1, account, vec![])], vec![(addr1, account, vec![])], vec![
799 (addr1, account, vec![]),
800 (addr2, account, vec![]),
801 (addr3, account, vec![]),
802 (addr4, account, vec![]),
803 ],
804 vec![(addr5, account, vec![])], ];
806
807 db.insert_changesets(changesets.clone(), None).expect("insert changesets");
808 db.insert_history(changesets.clone(), None).expect("insert history");
809
810 assert_eq!(
812 db.table::<tables::AccountChangeSets>().unwrap().len(),
813 changesets.iter().flatten().count()
814 );
815
816 let prune_mode = PruneMode::Before(10);
817
818 let deleted_entries_limit = 14; let limiter = PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
826
827 let input = PruneInput { previous_checkpoint: None, to_block: 10, limiter };
828 let segment = AccountHistory::new(prune_mode);
829
830 let provider = db.factory.database_provider_rw().unwrap();
831 provider.set_storage_settings_cache(StorageSettings::v1());
832 let result = segment.prune(&provider, input).unwrap();
833
834 assert!(!result.progress.is_finished(), "Expected HasMoreData since we stopped mid-block");
836
837 segment
839 .save_checkpoint(&provider, result.checkpoint.unwrap().as_prune_checkpoint(prune_mode))
840 .unwrap();
841 provider.commit().expect("commit");
842
843 let checkpoint = db
845 .factory
846 .provider()
847 .unwrap()
848 .get_prune_checkpoint(PruneSegment::AccountHistory)
849 .unwrap()
850 .expect("checkpoint should exist");
851
852 assert_eq!(
853 checkpoint.block_number,
854 Some(4),
855 "Checkpoint should be block 4 (block before incomplete block 5)"
856 );
857
858 let remaining_changesets = db.table::<tables::AccountChangeSets>().unwrap();
860 assert!(
864 !remaining_changesets.is_empty(),
865 "Should have remaining changesets for blocks 5-6"
866 );
867
868 let history = db.table::<tables::AccountsHistory>().unwrap();
871 for (key, _blocks) in &history {
872 assert!(
875 key.highest_block_number > 4,
876 "Found stale history shard with highest_block_number {} <= checkpoint 4",
877 key.highest_block_number
878 );
879 }
880
881 let input2 = PruneInput {
883 previous_checkpoint: Some(checkpoint),
884 to_block: 10,
885 limiter: PruneLimiter::default().set_deleted_entries_limit(100), };
887
888 let provider2 = db.factory.database_provider_rw().unwrap();
889 provider2.set_storage_settings_cache(StorageSettings::v1());
890 let result2 = segment.prune(&provider2, input2).unwrap();
891
892 assert!(result2.progress.is_finished(), "Second run should complete");
893
894 segment
895 .save_checkpoint(
896 &provider2,
897 result2.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
898 )
899 .unwrap();
900 provider2.commit().expect("commit");
901
902 let final_checkpoint = db
904 .factory
905 .provider()
906 .unwrap()
907 .get_prune_checkpoint(PruneSegment::AccountHistory)
908 .unwrap()
909 .expect("checkpoint should exist");
910
911 assert_eq!(final_checkpoint.block_number, Some(6), "Final checkpoint should be at block 6");
913
914 let final_changesets = db.table::<tables::AccountChangeSets>().unwrap();
916 assert!(final_changesets.is_empty(), "All changesets up to block 10 should be pruned");
917 }
918}