reth_prune/segments/user/
account_history.rs

1use crate::{
2    db_ext::DbTxPruneExt,
3    segments::{user::history::prune_history_indices, PruneInput, Segment},
4    PrunerError,
5};
6use itertools::Itertools;
7use reth_db_api::{models::ShardedKey, tables, transaction::DbTxMut};
8use reth_provider::DBProvider;
9use reth_prune_types::{
10    PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
11};
12use rustc_hash::FxHashMap;
13use tracing::{instrument, trace};
14
15/// Number of account history tables to prune in one step.
16///
17/// Account History consists of two tables: [`tables::AccountChangeSets`] and
18/// [`tables::AccountsHistory`]. We want to prune them to the same block number.
19const ACCOUNT_HISTORY_TABLES_TO_PRUNE: usize = 2;
20
21#[derive(Debug)]
22pub struct AccountHistory {
23    mode: PruneMode,
24}
25
26impl AccountHistory {
27    pub const fn new(mode: PruneMode) -> Self {
28        Self { mode }
29    }
30}
31
32impl<Provider> Segment<Provider> for AccountHistory
33where
34    Provider: DBProvider<Tx: DbTxMut>,
35{
36    fn segment(&self) -> PruneSegment {
37        PruneSegment::AccountHistory
38    }
39
40    fn mode(&self) -> Option<PruneMode> {
41        Some(self.mode)
42    }
43
44    fn purpose(&self) -> PrunePurpose {
45        PrunePurpose::User
46    }
47
48    #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
49    fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
50        let range = match input.get_next_block_range() {
51            Some(range) => range,
52            None => {
53                trace!(target: "pruner", "No account history to prune");
54                return Ok(SegmentOutput::done())
55            }
56        };
57        let range_end = *range.end();
58
59        let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() {
60            input.limiter.set_deleted_entries_limit(limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE)
61        } else {
62            input.limiter
63        };
64        if limiter.is_limit_reached() {
65            return Ok(SegmentOutput::not_done(
66                limiter.interrupt_reason(),
67                input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
68            ))
69        }
70
71        let mut last_changeset_pruned_block = None;
72        // Deleted account changeset keys (account addresses) with the highest block number deleted
73        // for that key.
74        //
75        // The size of this map it's limited by `prune_delete_limit * blocks_since_last_run /
76        // ACCOUNT_HISTORY_TABLES_TO_PRUNE`, and with current default it's usually `3500 * 5
77        // / 2`, so 8750 entries. Each entry is `160 bit + 256 bit + 64 bit`, so the total
78        // size should be up to 0.5MB + some hashmap overhead. `blocks_since_last_run` is
79        // additionally limited by the `max_reorg_depth`, so no OOM is expected here.
80        let mut highest_deleted_accounts = FxHashMap::default();
81        let (pruned_changesets, done) =
82            provider.tx_ref().prune_table_with_range::<tables::AccountChangeSets>(
83                range,
84                &mut limiter,
85                |_| false,
86                |(block_number, account)| {
87                    highest_deleted_accounts.insert(account.address, block_number);
88                    last_changeset_pruned_block = Some(block_number);
89                },
90            )?;
91        trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned account history (changesets)");
92
93        let last_changeset_pruned_block = last_changeset_pruned_block
94            // If there's more account changesets to prune, set the checkpoint block number to
95            // previous, so we could finish pruning its account changesets on the next run.
96            .map(|block_number| if done { block_number } else { block_number.saturating_sub(1) })
97            .unwrap_or(range_end);
98
99        // Sort highest deleted block numbers by account address and turn them into sharded keys.
100        // We did not use `BTreeMap` from the beginning, because it's inefficient for hashes.
101        let highest_sharded_keys = highest_deleted_accounts
102            .into_iter()
103            .sorted_unstable() // Unstable is fine because no equal keys exist in the map
104            .map(|(address, block_number)| {
105                ShardedKey::new(address, block_number.min(last_changeset_pruned_block))
106            });
107        let outcomes = prune_history_indices::<Provider, tables::AccountsHistory, _>(
108            provider,
109            highest_sharded_keys,
110            |a, b| a.key == b.key,
111        )?;
112        trace!(target: "pruner", ?outcomes, %done, "Pruned account history (indices)");
113
114        let progress = limiter.progress(done);
115
116        Ok(SegmentOutput {
117            progress,
118            pruned: pruned_changesets + outcomes.deleted,
119            checkpoint: Some(SegmentOutputCheckpoint {
120                block_number: Some(last_changeset_pruned_block),
121                tx_number: None,
122            }),
123        })
124    }
125}
126
127#[cfg(test)]
128mod tests {
129    use crate::segments::{
130        user::account_history::ACCOUNT_HISTORY_TABLES_TO_PRUNE, AccountHistory, PruneInput,
131        PruneLimiter, Segment, SegmentOutput,
132    };
133    use alloy_primitives::{BlockNumber, B256};
134    use assert_matches::assert_matches;
135    use reth_db_api::{tables, BlockNumberList};
136    use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader};
137    use reth_prune_types::{
138        PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
139    };
140    use reth_stages::test_utils::{StorageKind, TestStageDB};
141    use reth_testing_utils::generators::{
142        self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
143    };
144    use std::{collections::BTreeMap, ops::AddAssign};
145
146    #[test]
147    fn prune() {
148        let db = TestStageDB::default();
149        let mut rng = generators::rng();
150
151        let blocks = random_block_range(
152            &mut rng,
153            1..=5000,
154            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
155        );
156        db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
157
158        let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
159
160        let (changesets, _) = random_changeset_range(
161            &mut rng,
162            blocks.iter(),
163            accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
164            0..0,
165            0..0,
166        );
167        db.insert_changesets(changesets.clone(), None).expect("insert changesets");
168        db.insert_history(changesets.clone(), None).expect("insert history");
169
170        let account_occurrences = db.table::<tables::AccountsHistory>().unwrap().into_iter().fold(
171            BTreeMap::<_, usize>::new(),
172            |mut map, (key, _)| {
173                map.entry(key.key).or_default().add_assign(1);
174                map
175            },
176        );
177        assert!(account_occurrences.into_iter().any(|(_, occurrences)| occurrences > 1));
178
179        assert_eq!(
180            db.table::<tables::AccountChangeSets>().unwrap().len(),
181            changesets.iter().flatten().count()
182        );
183
184        let original_shards = db.table::<tables::AccountsHistory>().unwrap();
185
186        let test_prune =
187            |to_block: BlockNumber, run: usize, expected_result: (PruneProgress, usize)| {
188                let prune_mode = PruneMode::Before(to_block);
189                let deleted_entries_limit = 2000;
190                let mut limiter =
191                    PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
192                let input = PruneInput {
193                    previous_checkpoint: db
194                        .factory
195                        .provider()
196                        .unwrap()
197                        .get_prune_checkpoint(PruneSegment::AccountHistory)
198                        .unwrap(),
199                    to_block,
200                    limiter: limiter.clone(),
201                };
202                let segment = AccountHistory::new(prune_mode);
203
204                let provider = db.factory.database_provider_rw().unwrap();
205                let result = segment.prune(&provider, input).unwrap();
206                limiter.increment_deleted_entries_count_by(result.pruned);
207
208                assert_matches!(
209                    result,
210                    SegmentOutput {progress, pruned, checkpoint: Some(_)}
211                        if (progress, pruned) == expected_result
212                );
213
214                segment
215                    .save_checkpoint(
216                        &provider,
217                        result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
218                    )
219                    .unwrap();
220                provider.commit().expect("commit");
221
222                let changesets = changesets
223                    .iter()
224                    .enumerate()
225                    .flat_map(|(block_number, changeset)| {
226                        changeset.iter().map(move |change| (block_number, change))
227                    })
228                    .collect::<Vec<_>>();
229
230                #[expect(clippy::skip_while_next)]
231                let pruned = changesets
232                    .iter()
233                    .enumerate()
234                    .skip_while(|(i, (block_number, _))| {
235                        *i < deleted_entries_limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE * run &&
236                            *block_number <= to_block as usize
237                    })
238                    .next()
239                    .map(|(i, _)| i)
240                    .unwrap_or_default();
241
242                let mut pruned_changesets = changesets
243                    .iter()
244                    // Skip what we've pruned so far, subtracting one to get last pruned block
245                    // number further down
246                    .skip(pruned.saturating_sub(1));
247
248                let last_pruned_block_number = pruned_changesets
249                .next()
250                .map(|(block_number, _)| if result.progress.is_finished() {
251                    *block_number
252                } else {
253                    block_number.saturating_sub(1)
254                } as BlockNumber)
255                .unwrap_or(to_block);
256
257                let pruned_changesets = pruned_changesets.fold(
258                    BTreeMap::<_, Vec<_>>::new(),
259                    |mut acc, (block_number, change)| {
260                        acc.entry(block_number).or_default().push(change);
261                        acc
262                    },
263                );
264
265                assert_eq!(
266                    db.table::<tables::AccountChangeSets>().unwrap().len(),
267                    pruned_changesets.values().flatten().count()
268                );
269
270                let actual_shards = db.table::<tables::AccountsHistory>().unwrap();
271
272                let expected_shards = original_shards
273                    .iter()
274                    .filter(|(key, _)| key.highest_block_number > last_pruned_block_number)
275                    .map(|(key, blocks)| {
276                        let new_blocks =
277                            blocks.iter().skip_while(|block| *block <= last_pruned_block_number);
278                        (key.clone(), BlockNumberList::new_pre_sorted(new_blocks))
279                    })
280                    .collect::<Vec<_>>();
281
282                assert_eq!(actual_shards, expected_shards);
283
284                assert_eq!(
285                    db.factory
286                        .provider()
287                        .unwrap()
288                        .get_prune_checkpoint(PruneSegment::AccountHistory)
289                        .unwrap(),
290                    Some(PruneCheckpoint {
291                        block_number: Some(last_pruned_block_number),
292                        tx_number: None,
293                        prune_mode
294                    })
295                );
296            };
297
298        test_prune(
299            998,
300            1,
301            (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 1000),
302        );
303        test_prune(998, 2, (PruneProgress::Finished, 998));
304        test_prune(1400, 3, (PruneProgress::Finished, 804));
305    }
306}