reth_prune/segments/user/
storage_history.rs

1use crate::{
2    db_ext::DbTxPruneExt,
3    segments::{user::history::prune_history_indices, PruneInput, Segment, SegmentOutput},
4    PrunerError,
5};
6use itertools::Itertools;
7use reth_db_api::{
8    models::{storage_sharded_key::StorageShardedKey, BlockNumberAddress},
9    tables,
10    transaction::DbTxMut,
11};
12use reth_provider::DBProvider;
13use reth_prune_types::{PruneMode, PrunePurpose, PruneSegment, SegmentOutputCheckpoint};
14use rustc_hash::FxHashMap;
15use tracing::{instrument, trace};
16
17/// Number of storage history tables to prune in one step
18///
19/// Storage History consists of two tables: [`tables::StorageChangeSets`] and
20/// [`tables::StoragesHistory`]. We want to prune them to the same block number.
21const STORAGE_HISTORY_TABLES_TO_PRUNE: usize = 2;
22
23#[derive(Debug)]
24pub struct StorageHistory {
25    mode: PruneMode,
26}
27
28impl StorageHistory {
29    pub const fn new(mode: PruneMode) -> Self {
30        Self { mode }
31    }
32}
33
34impl<Provider> Segment<Provider> for StorageHistory
35where
36    Provider: DBProvider<Tx: DbTxMut>,
37{
38    fn segment(&self) -> PruneSegment {
39        PruneSegment::StorageHistory
40    }
41
42    fn mode(&self) -> Option<PruneMode> {
43        Some(self.mode)
44    }
45
46    fn purpose(&self) -> PrunePurpose {
47        PrunePurpose::User
48    }
49
50    #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
51    fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
52        let range = match input.get_next_block_range() {
53            Some(range) => range,
54            None => {
55                trace!(target: "pruner", "No storage history to prune");
56                return Ok(SegmentOutput::done())
57            }
58        };
59        let range_end = *range.end();
60
61        let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() {
62            input.limiter.set_deleted_entries_limit(limit / STORAGE_HISTORY_TABLES_TO_PRUNE)
63        } else {
64            input.limiter
65        };
66        if limiter.is_limit_reached() {
67            return Ok(SegmentOutput::not_done(
68                limiter.interrupt_reason(),
69                input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
70            ))
71        }
72
73        let mut last_changeset_pruned_block = None;
74        // Deleted storage changeset keys (account addresses and storage slots) with the highest
75        // block number deleted for that key.
76        //
77        // The size of this map it's limited by `prune_delete_limit * blocks_since_last_run /
78        // ACCOUNT_HISTORY_TABLES_TO_PRUNE`, and with current default it's usually `3500 * 5
79        // / 2`, so 8750 entries. Each entry is `160 bit + 256 bit + 64 bit`, so the total
80        // size should be up to 0.5MB + some hashmap overhead. `blocks_since_last_run` is
81        // additionally limited by the `max_reorg_depth`, so no OOM is expected here.
82        let mut highest_deleted_storages = FxHashMap::default();
83        let (pruned_changesets, done) =
84            provider.tx_ref().prune_table_with_range::<tables::StorageChangeSets>(
85                BlockNumberAddress::range(range),
86                &mut limiter,
87                |_| false,
88                |(BlockNumberAddress((block_number, address)), entry)| {
89                    highest_deleted_storages.insert((address, entry.key), block_number);
90                    last_changeset_pruned_block = Some(block_number);
91                },
92            )?;
93        trace!(target: "pruner", deleted = %pruned_changesets, %done, "Pruned storage history (changesets)");
94
95        let last_changeset_pruned_block = last_changeset_pruned_block
96            // If there's more storage changesets to prune, set the checkpoint block number to
97            // previous, so we could finish pruning its storage changesets on the next run.
98            .map(|block_number| if done { block_number } else { block_number.saturating_sub(1) })
99            .unwrap_or(range_end);
100
101        // Sort highest deleted block numbers by account address and storage key and turn them into
102        // sharded keys.
103        // We did not use `BTreeMap` from the beginning, because it's inefficient for hashes.
104        let highest_sharded_keys = highest_deleted_storages
105            .into_iter()
106            .sorted_unstable() // Unstable is fine because no equal keys exist in the map
107            .map(|((address, storage_key), block_number)| {
108                StorageShardedKey::new(
109                    address,
110                    storage_key,
111                    block_number.min(last_changeset_pruned_block),
112                )
113            });
114        let outcomes = prune_history_indices::<Provider, tables::StoragesHistory, _>(
115            provider,
116            highest_sharded_keys,
117            |a, b| a.address == b.address && a.sharded_key.key == b.sharded_key.key,
118        )?;
119        trace!(target: "pruner", ?outcomes, %done, "Pruned storage history (indices)");
120
121        let progress = limiter.progress(done);
122
123        Ok(SegmentOutput {
124            progress,
125            pruned: pruned_changesets + outcomes.deleted,
126            checkpoint: Some(SegmentOutputCheckpoint {
127                block_number: Some(last_changeset_pruned_block),
128                tx_number: None,
129            }),
130        })
131    }
132}
133
134#[cfg(test)]
135mod tests {
136    use crate::segments::{
137        user::storage_history::STORAGE_HISTORY_TABLES_TO_PRUNE, PruneInput, PruneLimiter, Segment,
138        SegmentOutput, StorageHistory,
139    };
140    use alloy_primitives::{BlockNumber, B256};
141    use assert_matches::assert_matches;
142    use reth_db_api::{tables, BlockNumberList};
143    use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader};
144    use reth_prune_types::{PruneCheckpoint, PruneMode, PruneProgress, PruneSegment};
145    use reth_stages::test_utils::{StorageKind, TestStageDB};
146    use reth_testing_utils::generators::{
147        self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
148    };
149    use std::{collections::BTreeMap, ops::AddAssign};
150
151    #[test]
152    fn prune() {
153        let db = TestStageDB::default();
154        let mut rng = generators::rng();
155
156        let blocks = random_block_range(
157            &mut rng,
158            0..=5000,
159            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
160        );
161        db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
162
163        let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
164
165        let (changesets, _) = random_changeset_range(
166            &mut rng,
167            blocks.iter(),
168            accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
169            1..2,
170            1..2,
171        );
172        db.insert_changesets(changesets.clone(), None).expect("insert changesets");
173        db.insert_history(changesets.clone(), None).expect("insert history");
174
175        let storage_occurrences = db.table::<tables::StoragesHistory>().unwrap().into_iter().fold(
176            BTreeMap::<_, usize>::new(),
177            |mut map, (key, _)| {
178                map.entry((key.address, key.sharded_key.key)).or_default().add_assign(1);
179                map
180            },
181        );
182        assert!(storage_occurrences.into_iter().any(|(_, occurrences)| occurrences > 1));
183
184        assert_eq!(
185            db.table::<tables::StorageChangeSets>().unwrap().len(),
186            changesets.iter().flatten().flat_map(|(_, _, entries)| entries).count()
187        );
188
189        let original_shards = db.table::<tables::StoragesHistory>().unwrap();
190
191        let test_prune = |to_block: BlockNumber,
192                          run: usize,
193                          expected_result: (PruneProgress, usize)| {
194            let prune_mode = PruneMode::Before(to_block);
195            let deleted_entries_limit = 1000;
196            let mut limiter =
197                PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
198            let input = PruneInput {
199                previous_checkpoint: db
200                    .factory
201                    .provider()
202                    .unwrap()
203                    .get_prune_checkpoint(PruneSegment::StorageHistory)
204                    .unwrap(),
205                to_block,
206                limiter: limiter.clone(),
207            };
208            let segment = StorageHistory::new(prune_mode);
209
210            let provider = db.factory.database_provider_rw().unwrap();
211            let result = segment.prune(&provider, input).unwrap();
212            limiter.increment_deleted_entries_count_by(result.pruned);
213
214            assert_matches!(
215                result,
216                SegmentOutput {progress, pruned, checkpoint: Some(_)}
217                    if (progress, pruned) == expected_result
218            );
219
220            segment
221                .save_checkpoint(
222                    &provider,
223                    result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
224                )
225                .unwrap();
226            provider.commit().expect("commit");
227
228            let changesets = changesets
229                .iter()
230                .enumerate()
231                .flat_map(|(block_number, changeset)| {
232                    changeset.iter().flat_map(move |(address, _, entries)| {
233                        entries.iter().map(move |entry| (block_number, address, entry))
234                    })
235                })
236                .collect::<Vec<_>>();
237
238            #[expect(clippy::skip_while_next)]
239            let pruned = changesets
240                .iter()
241                .enumerate()
242                .skip_while(|(i, (block_number, _, _))| {
243                    *i < deleted_entries_limit / STORAGE_HISTORY_TABLES_TO_PRUNE * run &&
244                        *block_number <= to_block as usize
245                })
246                .next()
247                .map(|(i, _)| i)
248                .unwrap_or_default();
249
250            let mut pruned_changesets = changesets
251                .iter()
252                // Skip what we've pruned so far, subtracting one to get last pruned block number
253                // further down
254                .skip(pruned.saturating_sub(1));
255
256            let last_pruned_block_number = pruned_changesets
257                .next()
258                .map(|(block_number, _, _)| if result.progress.is_finished() {
259                    *block_number
260                } else {
261                    block_number.saturating_sub(1)
262                } as BlockNumber)
263                .unwrap_or(to_block);
264
265            let pruned_changesets = pruned_changesets.fold(
266                BTreeMap::<_, Vec<_>>::new(),
267                |mut acc, (block_number, address, entry)| {
268                    acc.entry((block_number, address)).or_default().push(entry);
269                    acc
270                },
271            );
272
273            assert_eq!(
274                db.table::<tables::StorageChangeSets>().unwrap().len(),
275                pruned_changesets.values().flatten().count()
276            );
277
278            let actual_shards = db.table::<tables::StoragesHistory>().unwrap();
279
280            let expected_shards = original_shards
281                .iter()
282                .filter(|(key, _)| key.sharded_key.highest_block_number > last_pruned_block_number)
283                .map(|(key, blocks)| {
284                    let new_blocks =
285                        blocks.iter().skip_while(|block| *block <= last_pruned_block_number);
286                    (key.clone(), BlockNumberList::new_pre_sorted(new_blocks))
287                })
288                .collect::<Vec<_>>();
289
290            assert_eq!(actual_shards, expected_shards);
291
292            assert_eq!(
293                db.factory
294                    .provider()
295                    .unwrap()
296                    .get_prune_checkpoint(PruneSegment::StorageHistory)
297                    .unwrap(),
298                Some(PruneCheckpoint {
299                    block_number: Some(last_pruned_block_number),
300                    tx_number: None,
301                    prune_mode
302                })
303            );
304        };
305
306        test_prune(
307            998,
308            1,
309            (
310                PruneProgress::HasMoreData(
311                    reth_prune_types::PruneInterruptReason::DeletedEntriesLimitReached,
312                ),
313                500,
314            ),
315        );
316        test_prune(998, 2, (PruneProgress::Finished, 499));
317        test_prune(1200, 3, (PruneProgress::Finished, 202));
318    }
319}