1use crate::{
2 db_ext::DbTxPruneExt,
3 segments::{
4 user::history::{finalize_history_prune, HistoryPruneResult},
5 PruneInput, Segment,
6 },
7 PrunerError,
8};
9use alloy_primitives::BlockNumber;
10use reth_db_api::{models::ShardedKey, tables, transaction::DbTxMut};
11use reth_provider::{
12 changeset_walker::StaticFileAccountChangesetWalker, DBProvider, EitherWriter,
13 RocksDBProviderFactory, StaticFileProviderFactory,
14};
15use reth_prune_types::{
16 PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
17};
18use reth_static_file_types::StaticFileSegment;
19use reth_storage_api::{ChangeSetReader, StorageSettingsCache};
20use rustc_hash::FxHashMap;
21use tracing::{instrument, trace};
22
23const ACCOUNT_HISTORY_TABLES_TO_PRUNE: usize = 2;
28
29#[derive(Debug)]
30pub struct AccountHistory {
31 mode: PruneMode,
32}
33
34impl AccountHistory {
35 pub const fn new(mode: PruneMode) -> Self {
36 Self { mode }
37 }
38}
39
40impl<Provider> Segment<Provider> for AccountHistory
41where
42 Provider: DBProvider<Tx: DbTxMut>
43 + StaticFileProviderFactory
44 + StorageSettingsCache
45 + ChangeSetReader
46 + RocksDBProviderFactory,
47{
48 fn segment(&self) -> PruneSegment {
49 PruneSegment::AccountHistory
50 }
51
52 fn mode(&self) -> Option<PruneMode> {
53 Some(self.mode)
54 }
55
56 fn purpose(&self) -> PrunePurpose {
57 PrunePurpose::User
58 }
59
60 #[instrument(
61 name = "AccountHistory::prune",
62 target = "pruner",
63 skip(self, provider),
64 ret(level = "trace")
65 )]
66 fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
67 let range = match input.get_next_block_range() {
68 Some(range) => range,
69 None => {
70 trace!(target: "pruner", "No account history to prune");
71 return Ok(SegmentOutput::done())
72 }
73 };
74 let range_end = *range.end();
75
76 if provider.cached_storage_settings().storage_v2 {
78 return self.prune_rocksdb(provider, input, range, range_end);
79 }
80
81 if EitherWriter::account_changesets_destination(provider).is_static_file() {
83 self.prune_static_files(provider, input, range, range_end)
84 } else {
85 self.prune_database(provider, input, range, range_end)
86 }
87 }
88}
89
90impl AccountHistory {
91 fn prune_static_files<Provider>(
93 &self,
94 provider: &Provider,
95 input: PruneInput,
96 range: std::ops::RangeInclusive<BlockNumber>,
97 range_end: BlockNumber,
98 ) -> Result<SegmentOutput, PrunerError>
99 where
100 Provider: DBProvider<Tx: DbTxMut> + StaticFileProviderFactory + ChangeSetReader,
101 {
102 let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() {
103 input.limiter.set_deleted_entries_limit(limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE)
104 } else {
105 input.limiter
106 };
107
108 if limiter.is_limit_reached() {
111 return Ok(SegmentOutput::not_done(
112 limiter.interrupt_reason(),
113 input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
114 ))
115 }
116
117 let mut highest_deleted_accounts = FxHashMap::default();
126 let mut last_changeset_pruned_block = None;
127 let mut pruned_changesets = 0;
128 let mut done = true;
129
130 let walker = StaticFileAccountChangesetWalker::new(provider, range);
131 for result in walker {
132 if limiter.is_limit_reached() {
133 done = false;
134 break;
135 }
136 let (block_number, changeset) = result?;
137 highest_deleted_accounts.insert(changeset.address, block_number);
138 last_changeset_pruned_block = Some(block_number);
139 pruned_changesets += 1;
140 limiter.increment_deleted_entries_count();
141 }
142
143 if done && let Some(last_block) = last_changeset_pruned_block {
145 provider
146 .static_file_provider()
147 .delete_segment_below_block(StaticFileSegment::AccountChangeSets, last_block + 1)?;
148 }
149 trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned account history (changesets from static files)");
150
151 let result = HistoryPruneResult {
152 highest_deleted: highest_deleted_accounts,
153 last_pruned_block: last_changeset_pruned_block,
154 pruned_count: pruned_changesets,
155 done,
156 };
157 finalize_history_prune::<_, tables::AccountsHistory, _, _>(
158 provider,
159 result,
160 range_end,
161 &limiter,
162 ShardedKey::new,
163 |a, b| a.key == b.key,
164 )
165 .map_err(Into::into)
166 }
167
168 fn prune_database<Provider>(
169 &self,
170 provider: &Provider,
171 input: PruneInput,
172 range: std::ops::RangeInclusive<BlockNumber>,
173 range_end: BlockNumber,
174 ) -> Result<SegmentOutput, PrunerError>
175 where
176 Provider: DBProvider<Tx: DbTxMut>,
177 {
178 let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() {
179 input.limiter.set_deleted_entries_limit(limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE)
180 } else {
181 input.limiter
182 };
183
184 if limiter.is_limit_reached() {
185 return Ok(SegmentOutput::not_done(
186 limiter.interrupt_reason(),
187 input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
188 ))
189 }
190
191 let mut last_changeset_pruned_block = None;
200 let mut highest_deleted_accounts = FxHashMap::default();
201 let (pruned_changesets, done) =
202 provider.tx_ref().prune_table_with_range::<tables::AccountChangeSets>(
203 range,
204 &mut limiter,
205 |_| false,
206 |(block_number, account)| {
207 highest_deleted_accounts.insert(account.address, block_number);
208 last_changeset_pruned_block = Some(block_number);
209 },
210 )?;
211 trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned account history (changesets from database)");
212
213 let result = HistoryPruneResult {
214 highest_deleted: highest_deleted_accounts,
215 last_pruned_block: last_changeset_pruned_block,
216 pruned_count: pruned_changesets,
217 done,
218 };
219 finalize_history_prune::<_, tables::AccountsHistory, _, _>(
220 provider,
221 result,
222 range_end,
223 &limiter,
224 ShardedKey::new,
225 |a, b| a.key == b.key,
226 )
227 .map_err(Into::into)
228 }
229
230 fn prune_rocksdb<Provider>(
235 &self,
236 provider: &Provider,
237 input: PruneInput,
238 range: std::ops::RangeInclusive<BlockNumber>,
239 range_end: BlockNumber,
240 ) -> Result<SegmentOutput, PrunerError>
241 where
242 Provider: DBProvider + StaticFileProviderFactory + ChangeSetReader + RocksDBProviderFactory,
243 {
244 let mut limiter = input.limiter;
248
249 if limiter.is_limit_reached() {
250 return Ok(SegmentOutput::not_done(
251 limiter.interrupt_reason(),
252 input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
253 ))
254 }
255
256 let mut highest_deleted_accounts = FxHashMap::default();
257 let mut last_changeset_pruned_block = None;
258 let mut changesets_processed = 0usize;
259 let mut done = true;
260
261 let walker = StaticFileAccountChangesetWalker::new(provider, range);
265 for result in walker {
266 if limiter.is_limit_reached() {
267 done = false;
268 break;
269 }
270 let (block_number, changeset) = result?;
271 highest_deleted_accounts.insert(changeset.address, block_number);
272 last_changeset_pruned_block = Some(block_number);
273 changesets_processed += 1;
274 limiter.increment_deleted_entries_count();
275 }
276 trace!(target: "pruner", processed = %changesets_processed, %done, "Scanned account changesets from static files");
277
278 let last_changeset_pruned_block = last_changeset_pruned_block
279 .map(|block_number| if done { block_number } else { block_number.saturating_sub(1) })
280 .unwrap_or(range_end);
281
282 let mut deleted_shards = 0usize;
284 let mut updated_shards = 0usize;
285
286 let mut sorted_accounts: Vec<_> = highest_deleted_accounts.into_iter().collect();
288 sorted_accounts.sort_unstable_by_key(|(addr, _)| *addr);
289
290 provider.with_rocksdb_batch(|mut batch| {
291 let targets: Vec<_> = sorted_accounts
292 .iter()
293 .map(|(addr, highest)| (*addr, (*highest).min(last_changeset_pruned_block)))
294 .collect();
295
296 let outcomes = batch.prune_account_history_batch(&targets)?;
297 deleted_shards = outcomes.deleted;
298 updated_shards = outcomes.updated;
299
300 Ok(((), Some(batch.into_inner())))
301 })?;
302 trace!(target: "pruner", deleted = deleted_shards, updated = updated_shards, %done, "Pruned account history (RocksDB indices)");
303
304 if done {
310 provider.static_file_provider().delete_segment_below_block(
311 StaticFileSegment::AccountChangeSets,
312 last_changeset_pruned_block + 1,
313 )?;
314 }
315
316 let progress = limiter.progress(done);
317
318 Ok(SegmentOutput {
319 progress,
320 pruned: changesets_processed + deleted_shards + updated_shards,
321 checkpoint: Some(SegmentOutputCheckpoint {
322 block_number: Some(last_changeset_pruned_block),
323 tx_number: None,
324 }),
325 })
326 }
327}
328
329#[cfg(test)]
330mod tests {
331 use super::ACCOUNT_HISTORY_TABLES_TO_PRUNE;
332 use crate::segments::{AccountHistory, PruneInput, PruneLimiter, Segment, SegmentOutput};
333 use alloy_primitives::{BlockNumber, B256};
334 use assert_matches::assert_matches;
335 use reth_db_api::{models::StorageSettings, tables, BlockNumberList};
336 use reth_provider::{DBProvider, DatabaseProviderFactory, PruneCheckpointReader};
337 use reth_prune_types::{
338 PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
339 };
340 use reth_stages::test_utils::{StorageKind, TestStageDB};
341 use reth_storage_api::StorageSettingsCache;
342 use reth_testing_utils::generators::{
343 self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
344 };
345 use std::{collections::BTreeMap, ops::AddAssign};
346
347 #[test]
348 fn prune_legacy() {
349 let db = TestStageDB::default();
350 let mut rng = generators::rng();
351
352 let blocks = random_block_range(
353 &mut rng,
354 0..=5000,
355 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
356 );
357 db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
358
359 let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
360
361 let (changesets, _) = random_changeset_range(
362 &mut rng,
363 blocks.iter(),
364 accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
365 0..0,
366 0..0,
367 );
368 db.insert_changesets(changesets.clone(), None).expect("insert changesets");
369 db.insert_history(changesets.clone(), None).expect("insert history");
370
371 let account_occurrences = db.table::<tables::AccountsHistory>().unwrap().into_iter().fold(
372 BTreeMap::<_, usize>::new(),
373 |mut map, (key, _)| {
374 map.entry(key.key).or_default().add_assign(1);
375 map
376 },
377 );
378 assert!(account_occurrences.into_iter().any(|(_, occurrences)| occurrences > 1));
379
380 assert_eq!(
381 db.table::<tables::AccountChangeSets>().unwrap().len(),
382 changesets.iter().flatten().count()
383 );
384
385 let original_shards = db.table::<tables::AccountsHistory>().unwrap();
386
387 let test_prune =
388 |to_block: BlockNumber, run: usize, expected_result: (PruneProgress, usize)| {
389 let prune_mode = PruneMode::Before(to_block);
390 let deleted_entries_limit = 2000;
391 let mut limiter =
392 PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
393 let input = PruneInput {
394 previous_checkpoint: db
395 .factory
396 .provider()
397 .unwrap()
398 .get_prune_checkpoint(PruneSegment::AccountHistory)
399 .unwrap(),
400 to_block,
401 limiter: limiter.clone(),
402 };
403 let segment = AccountHistory::new(prune_mode);
404
405 let provider = db.factory.database_provider_rw().unwrap();
406 provider.set_storage_settings_cache(StorageSettings::v1());
407 let result = segment.prune(&provider, input).unwrap();
408 limiter.increment_deleted_entries_count_by(result.pruned);
409
410 assert_matches!(
411 result,
412 SegmentOutput {progress, pruned, checkpoint: Some(_)}
413 if (progress, pruned) == expected_result
414 );
415
416 segment
417 .save_checkpoint(
418 &provider,
419 result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
420 )
421 .unwrap();
422 provider.commit().expect("commit");
423
424 let changesets = changesets
425 .iter()
426 .enumerate()
427 .flat_map(|(block_number, changeset)| {
428 changeset.iter().map(move |change| (block_number, change))
429 })
430 .collect::<Vec<_>>();
431
432 #[expect(clippy::skip_while_next)]
433 let pruned = changesets
434 .iter()
435 .enumerate()
436 .skip_while(|(i, (block_number, _))| {
437 *i < deleted_entries_limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE * run &&
438 *block_number <= to_block as usize
439 })
440 .next()
441 .map(|(i, _)| i)
442 .unwrap_or_default();
443
444 let mut pruned_changesets = changesets.iter().skip(pruned.saturating_sub(1));
447
448 let last_pruned_block_number = pruned_changesets
449 .next()
450 .map(|(block_number, _)| if result.progress.is_finished() {
451 *block_number
452 } else {
453 block_number.saturating_sub(1)
454 } as BlockNumber)
455 .unwrap_or(to_block);
456
457 let pruned_changesets = pruned_changesets.fold(
458 BTreeMap::<_, Vec<_>>::new(),
459 |mut acc, (block_number, change)| {
460 acc.entry(block_number).or_default().push(change);
461 acc
462 },
463 );
464
465 assert_eq!(
466 db.table::<tables::AccountChangeSets>().unwrap().len(),
467 pruned_changesets.values().flatten().count()
468 );
469
470 let actual_shards = db.table::<tables::AccountsHistory>().unwrap();
471
472 let expected_shards = original_shards
473 .iter()
474 .filter(|(key, _)| key.highest_block_number > last_pruned_block_number)
475 .map(|(key, blocks)| {
476 let new_blocks =
477 blocks.iter().skip_while(|block| *block <= last_pruned_block_number);
478 (key.clone(), BlockNumberList::new_pre_sorted(new_blocks))
479 })
480 .collect::<Vec<_>>();
481
482 assert_eq!(actual_shards, expected_shards);
483
484 assert_eq!(
485 db.factory
486 .provider()
487 .unwrap()
488 .get_prune_checkpoint(PruneSegment::AccountHistory)
489 .unwrap(),
490 Some(PruneCheckpoint {
491 block_number: Some(last_pruned_block_number),
492 tx_number: None,
493 prune_mode
494 })
495 );
496 };
497
498 test_prune(
499 998,
500 1,
501 (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 1000),
502 );
503 test_prune(998, 2, (PruneProgress::Finished, 998));
504 test_prune(1400, 3, (PruneProgress::Finished, 804));
505 }
506
507 #[test]
508 fn prune_rocksdb_path() {
509 use reth_db_api::models::ShardedKey;
510 use reth_provider::{RocksDBProviderFactory, StaticFileProviderFactory};
511
512 let db = TestStageDB::default();
513 let mut rng = generators::rng();
514
515 let blocks = random_block_range(
516 &mut rng,
517 0..=100,
518 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
519 );
520 db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
521
522 let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
523
524 let (changesets, _) = random_changeset_range(
525 &mut rng,
526 blocks.iter(),
527 accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
528 0..0,
529 0..0,
530 );
531
532 db.insert_changesets_to_static_files(changesets.clone(), None)
533 .expect("insert changesets to static files");
534
535 let mut account_blocks: BTreeMap<_, Vec<u64>> = BTreeMap::new();
536 for (block, changeset) in changesets.iter().enumerate() {
537 for (address, _, _) in changeset {
538 account_blocks.entry(*address).or_default().push(block as u64);
539 }
540 }
541
542 let rocksdb = db.factory.rocksdb_provider();
543 let mut batch = rocksdb.batch();
544 for (address, block_numbers) in &account_blocks {
545 let shard = BlockNumberList::new_pre_sorted(block_numbers.iter().copied());
546 batch
547 .put::<tables::AccountsHistory>(ShardedKey::new(*address, u64::MAX), &shard)
548 .unwrap();
549 }
550 batch.commit().unwrap();
551
552 for (address, expected_blocks) in &account_blocks {
553 let shards = rocksdb.account_history_shards(*address).unwrap();
554 assert_eq!(shards.len(), 1);
555 assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), *expected_blocks);
556 }
557
558 let to_block: BlockNumber = 50;
559 let prune_mode = PruneMode::Before(to_block);
560 let input =
561 PruneInput { previous_checkpoint: None, to_block, limiter: PruneLimiter::default() };
562 let segment = AccountHistory::new(prune_mode);
563
564 db.factory.set_storage_settings_cache(StorageSettings::v2());
565
566 let provider = db.factory.database_provider_rw().unwrap();
567 let result = segment.prune(&provider, input).unwrap();
568 provider.commit().expect("commit");
569
570 assert_matches!(
571 result,
572 SegmentOutput { progress: PruneProgress::Finished, pruned, checkpoint: Some(_) }
573 if pruned > 0
574 );
575
576 for (address, original_blocks) in &account_blocks {
577 let shards = rocksdb.account_history_shards(*address).unwrap();
578
579 let expected_blocks: Vec<u64> =
580 original_blocks.iter().copied().filter(|b| *b > to_block).collect();
581
582 if expected_blocks.is_empty() {
583 assert!(
584 shards.is_empty(),
585 "Expected no shards for address {address:?} after pruning"
586 );
587 } else {
588 assert_eq!(shards.len(), 1, "Expected 1 shard for address {address:?}");
589 assert_eq!(
590 shards[0].1.iter().collect::<Vec<_>>(),
591 expected_blocks,
592 "Shard blocks mismatch for address {address:?}"
593 );
594 }
595 }
596
597 let static_file_provider = db.factory.static_file_provider();
598 let highest_block = static_file_provider.get_highest_static_file_block(
599 reth_static_file_types::StaticFileSegment::AccountChangeSets,
600 );
601 if let Some(block) = highest_block {
602 assert!(
603 block > to_block,
604 "Static files should only contain blocks above to_block ({to_block}), got {block}"
605 );
606 }
607 }
608
609 #[test]
612 fn prune_partial_progress_mid_block() {
613 use alloy_primitives::{Address, U256};
614 use reth_primitives_traits::Account;
615 use reth_testing_utils::generators::ChangeSet;
616
617 let db = TestStageDB::default();
618 let mut rng = generators::rng();
619
620 let blocks = random_block_range(
622 &mut rng,
623 0..=10,
624 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
625 );
626 db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
627
628 let addr1 = Address::with_last_byte(1);
630 let addr2 = Address::with_last_byte(2);
631 let addr3 = Address::with_last_byte(3);
632 let addr4 = Address::with_last_byte(4);
633 let addr5 = Address::with_last_byte(5);
634
635 let account = Account { nonce: 1, balance: U256::from(100), bytecode_hash: None };
636
637 let changesets: Vec<ChangeSet> = vec![
639 vec![(addr1, account, vec![])], vec![(addr1, account, vec![])], vec![(addr1, account, vec![])], vec![(addr1, account, vec![])], vec![(addr1, account, vec![])], vec![
646 (addr1, account, vec![]),
647 (addr2, account, vec![]),
648 (addr3, account, vec![]),
649 (addr4, account, vec![]),
650 ],
651 vec![(addr5, account, vec![])], ];
653
654 db.insert_changesets(changesets.clone(), None).expect("insert changesets");
655 db.insert_history(changesets.clone(), None).expect("insert history");
656
657 assert_eq!(
659 db.table::<tables::AccountChangeSets>().unwrap().len(),
660 changesets.iter().flatten().count()
661 );
662
663 let prune_mode = PruneMode::Before(10);
664
665 let deleted_entries_limit = 14; let limiter = PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
673
674 let input = PruneInput { previous_checkpoint: None, to_block: 10, limiter };
675 let segment = AccountHistory::new(prune_mode);
676
677 let provider = db.factory.database_provider_rw().unwrap();
678 provider.set_storage_settings_cache(StorageSettings::v1());
679 let result = segment.prune(&provider, input).unwrap();
680
681 assert!(!result.progress.is_finished(), "Expected HasMoreData since we stopped mid-block");
683
684 segment
686 .save_checkpoint(&provider, result.checkpoint.unwrap().as_prune_checkpoint(prune_mode))
687 .unwrap();
688 provider.commit().expect("commit");
689
690 let checkpoint = db
692 .factory
693 .provider()
694 .unwrap()
695 .get_prune_checkpoint(PruneSegment::AccountHistory)
696 .unwrap()
697 .expect("checkpoint should exist");
698
699 assert_eq!(
700 checkpoint.block_number,
701 Some(4),
702 "Checkpoint should be block 4 (block before incomplete block 5)"
703 );
704
705 let remaining_changesets = db.table::<tables::AccountChangeSets>().unwrap();
707 assert!(
711 !remaining_changesets.is_empty(),
712 "Should have remaining changesets for blocks 5-6"
713 );
714
715 let history = db.table::<tables::AccountsHistory>().unwrap();
718 for (key, _blocks) in &history {
719 assert!(
722 key.highest_block_number > 4,
723 "Found stale history shard with highest_block_number {} <= checkpoint 4",
724 key.highest_block_number
725 );
726 }
727
728 let input2 = PruneInput {
730 previous_checkpoint: Some(checkpoint),
731 to_block: 10,
732 limiter: PruneLimiter::default().set_deleted_entries_limit(100), };
734
735 let provider2 = db.factory.database_provider_rw().unwrap();
736 provider2.set_storage_settings_cache(StorageSettings::v1());
737 let result2 = segment.prune(&provider2, input2).unwrap();
738
739 assert!(result2.progress.is_finished(), "Second run should complete");
740
741 segment
742 .save_checkpoint(
743 &provider2,
744 result2.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
745 )
746 .unwrap();
747 provider2.commit().expect("commit");
748
749 let final_checkpoint = db
751 .factory
752 .provider()
753 .unwrap()
754 .get_prune_checkpoint(PruneSegment::AccountHistory)
755 .unwrap()
756 .expect("checkpoint should exist");
757
758 assert_eq!(final_checkpoint.block_number, Some(6), "Final checkpoint should be at block 6");
760
761 let final_changesets = db.table::<tables::AccountChangeSets>().unwrap();
763 assert!(final_changesets.is_empty(), "All changesets up to block 10 should be pruned");
764 }
765}