reth_prune/segments/user/
account_history.rs
1use crate::{
2 db_ext::DbTxPruneExt,
3 segments::{user::history::prune_history_indices, PruneInput, Segment},
4 PrunerError,
5};
6use itertools::Itertools;
7use reth_db_api::{models::ShardedKey, tables, transaction::DbTxMut};
8use reth_provider::DBProvider;
9use reth_prune_types::{
10 PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
11};
12use rustc_hash::FxHashMap;
13use tracing::{instrument, trace};
14
15const ACCOUNT_HISTORY_TABLES_TO_PRUNE: usize = 2;
20
21#[derive(Debug)]
22pub struct AccountHistory {
23 mode: PruneMode,
24}
25
26impl AccountHistory {
27 pub const fn new(mode: PruneMode) -> Self {
28 Self { mode }
29 }
30}
31
32impl<Provider> Segment<Provider> for AccountHistory
33where
34 Provider: DBProvider<Tx: DbTxMut>,
35{
36 fn segment(&self) -> PruneSegment {
37 PruneSegment::AccountHistory
38 }
39
40 fn mode(&self) -> Option<PruneMode> {
41 Some(self.mode)
42 }
43
44 fn purpose(&self) -> PrunePurpose {
45 PrunePurpose::User
46 }
47
48 #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
49 fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
50 let range = match input.get_next_block_range() {
51 Some(range) => range,
52 None => {
53 trace!(target: "pruner", "No account history to prune");
54 return Ok(SegmentOutput::done())
55 }
56 };
57 let range_end = *range.end();
58
59 let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() {
60 input.limiter.set_deleted_entries_limit(limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE)
61 } else {
62 input.limiter
63 };
64 if limiter.is_limit_reached() {
65 return Ok(SegmentOutput::not_done(
66 limiter.interrupt_reason(),
67 input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
68 ))
69 }
70
71 let mut last_changeset_pruned_block = None;
72 let mut highest_deleted_accounts = FxHashMap::default();
81 let (pruned_changesets, done) =
82 provider.tx_ref().prune_table_with_range::<tables::AccountChangeSets>(
83 range,
84 &mut limiter,
85 |_| false,
86 |(block_number, account)| {
87 highest_deleted_accounts.insert(account.address, block_number);
88 last_changeset_pruned_block = Some(block_number);
89 },
90 )?;
91 trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned account history (changesets)");
92
93 let last_changeset_pruned_block = last_changeset_pruned_block
94 .map(|block_number| if done { block_number } else { block_number.saturating_sub(1) })
97 .unwrap_or(range_end);
98
99 let highest_sharded_keys = highest_deleted_accounts
102 .into_iter()
103 .sorted_unstable() .map(|(address, block_number)| {
105 ShardedKey::new(address, block_number.min(last_changeset_pruned_block))
106 });
107 let outcomes = prune_history_indices::<Provider, tables::AccountsHistory, _>(
108 provider,
109 highest_sharded_keys,
110 |a, b| a.key == b.key,
111 )?;
112 trace!(target: "pruner", ?outcomes, %done, "Pruned account history (indices)");
113
114 let progress = limiter.progress(done);
115
116 Ok(SegmentOutput {
117 progress,
118 pruned: pruned_changesets + outcomes.deleted,
119 checkpoint: Some(SegmentOutputCheckpoint {
120 block_number: Some(last_changeset_pruned_block),
121 tx_number: None,
122 }),
123 })
124 }
125}
126
127#[cfg(test)]
128mod tests {
129 use crate::segments::{
130 user::account_history::ACCOUNT_HISTORY_TABLES_TO_PRUNE, AccountHistory, PruneInput,
131 PruneLimiter, Segment, SegmentOutput,
132 };
133 use alloy_primitives::{BlockNumber, B256};
134 use assert_matches::assert_matches;
135 use reth_db_api::{tables, BlockNumberList};
136 use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader};
137 use reth_prune_types::{
138 PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
139 };
140 use reth_stages::test_utils::{StorageKind, TestStageDB};
141 use reth_testing_utils::generators::{
142 self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
143 };
144 use std::{collections::BTreeMap, ops::AddAssign};
145
146 #[test]
147 fn prune() {
148 let db = TestStageDB::default();
149 let mut rng = generators::rng();
150
151 let blocks = random_block_range(
152 &mut rng,
153 1..=5000,
154 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
155 );
156 db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
157
158 let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
159
160 let (changesets, _) = random_changeset_range(
161 &mut rng,
162 blocks.iter(),
163 accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
164 0..0,
165 0..0,
166 );
167 db.insert_changesets(changesets.clone(), None).expect("insert changesets");
168 db.insert_history(changesets.clone(), None).expect("insert history");
169
170 let account_occurrences = db.table::<tables::AccountsHistory>().unwrap().into_iter().fold(
171 BTreeMap::<_, usize>::new(),
172 |mut map, (key, _)| {
173 map.entry(key.key).or_default().add_assign(1);
174 map
175 },
176 );
177 assert!(account_occurrences.into_iter().any(|(_, occurrences)| occurrences > 1));
178
179 assert_eq!(
180 db.table::<tables::AccountChangeSets>().unwrap().len(),
181 changesets.iter().flatten().count()
182 );
183
184 let original_shards = db.table::<tables::AccountsHistory>().unwrap();
185
186 let test_prune =
187 |to_block: BlockNumber, run: usize, expected_result: (PruneProgress, usize)| {
188 let prune_mode = PruneMode::Before(to_block);
189 let deleted_entries_limit = 2000;
190 let mut limiter =
191 PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
192 let input = PruneInput {
193 previous_checkpoint: db
194 .factory
195 .provider()
196 .unwrap()
197 .get_prune_checkpoint(PruneSegment::AccountHistory)
198 .unwrap(),
199 to_block,
200 limiter: limiter.clone(),
201 };
202 let segment = AccountHistory::new(prune_mode);
203
204 let provider = db.factory.database_provider_rw().unwrap();
205 let result = segment.prune(&provider, input).unwrap();
206 limiter.increment_deleted_entries_count_by(result.pruned);
207
208 assert_matches!(
209 result,
210 SegmentOutput {progress, pruned, checkpoint: Some(_)}
211 if (progress, pruned) == expected_result
212 );
213
214 segment
215 .save_checkpoint(
216 &provider,
217 result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
218 )
219 .unwrap();
220 provider.commit().expect("commit");
221
222 let changesets = changesets
223 .iter()
224 .enumerate()
225 .flat_map(|(block_number, changeset)| {
226 changeset.iter().map(move |change| (block_number, change))
227 })
228 .collect::<Vec<_>>();
229
230 #[expect(clippy::skip_while_next)]
231 let pruned = changesets
232 .iter()
233 .enumerate()
234 .skip_while(|(i, (block_number, _))| {
235 *i < deleted_entries_limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE * run &&
236 *block_number <= to_block as usize
237 })
238 .next()
239 .map(|(i, _)| i)
240 .unwrap_or_default();
241
242 let mut pruned_changesets = changesets
243 .iter()
244 .skip(pruned.saturating_sub(1));
247
248 let last_pruned_block_number = pruned_changesets
249 .next()
250 .map(|(block_number, _)| if result.progress.is_finished() {
251 *block_number
252 } else {
253 block_number.saturating_sub(1)
254 } as BlockNumber)
255 .unwrap_or(to_block);
256
257 let pruned_changesets = pruned_changesets.fold(
258 BTreeMap::<_, Vec<_>>::new(),
259 |mut acc, (block_number, change)| {
260 acc.entry(block_number).or_default().push(change);
261 acc
262 },
263 );
264
265 assert_eq!(
266 db.table::<tables::AccountChangeSets>().unwrap().len(),
267 pruned_changesets.values().flatten().count()
268 );
269
270 let actual_shards = db.table::<tables::AccountsHistory>().unwrap();
271
272 let expected_shards = original_shards
273 .iter()
274 .filter(|(key, _)| key.highest_block_number > last_pruned_block_number)
275 .map(|(key, blocks)| {
276 let new_blocks =
277 blocks.iter().skip_while(|block| *block <= last_pruned_block_number);
278 (key.clone(), BlockNumberList::new_pre_sorted(new_blocks))
279 })
280 .collect::<Vec<_>>();
281
282 assert_eq!(actual_shards, expected_shards);
283
284 assert_eq!(
285 db.factory
286 .provider()
287 .unwrap()
288 .get_prune_checkpoint(PruneSegment::AccountHistory)
289 .unwrap(),
290 Some(PruneCheckpoint {
291 block_number: Some(last_pruned_block_number),
292 tx_number: None,
293 prune_mode
294 })
295 );
296 };
297
298 test_prune(
299 998,
300 1,
301 (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 1000),
302 );
303 test_prune(998, 2, (PruneProgress::Finished, 998));
304 test_prune(1400, 3, (PruneProgress::Finished, 804));
305 }
306}