1use crate::{
2 db_ext::DbTxPruneExt,
3 segments::{user::history::prune_history_indices, PruneInput, Segment, SegmentOutput},
4 PrunerError,
5};
6use itertools::Itertools;
7use reth_db_api::{
8 models::{storage_sharded_key::StorageShardedKey, BlockNumberAddress},
9 tables,
10 transaction::DbTxMut,
11};
12use reth_provider::DBProvider;
13use reth_prune_types::{PruneMode, PrunePurpose, PruneSegment, SegmentOutputCheckpoint};
14use rustc_hash::FxHashMap;
15use tracing::{instrument, trace};
16
17const STORAGE_HISTORY_TABLES_TO_PRUNE: usize = 2;
22
23#[derive(Debug)]
24pub struct StorageHistory {
25 mode: PruneMode,
26}
27
28impl StorageHistory {
29 pub const fn new(mode: PruneMode) -> Self {
30 Self { mode }
31 }
32}
33
34impl<Provider> Segment<Provider> for StorageHistory
35where
36 Provider: DBProvider<Tx: DbTxMut>,
37{
38 fn segment(&self) -> PruneSegment {
39 PruneSegment::StorageHistory
40 }
41
42 fn mode(&self) -> Option<PruneMode> {
43 Some(self.mode)
44 }
45
46 fn purpose(&self) -> PrunePurpose {
47 PrunePurpose::User
48 }
49
50 #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
51 fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
52 let range = match input.get_next_block_range() {
53 Some(range) => range,
54 None => {
55 trace!(target: "pruner", "No storage history to prune");
56 return Ok(SegmentOutput::done())
57 }
58 };
59 let range_end = *range.end();
60
61 let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() {
62 input.limiter.set_deleted_entries_limit(limit / STORAGE_HISTORY_TABLES_TO_PRUNE)
63 } else {
64 input.limiter
65 };
66 if limiter.is_limit_reached() {
67 return Ok(SegmentOutput::not_done(
68 limiter.interrupt_reason(),
69 input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
70 ))
71 }
72
73 let mut last_changeset_pruned_block = None;
74 let mut highest_deleted_storages = FxHashMap::default();
83 let (pruned_changesets, done) =
84 provider.tx_ref().prune_table_with_range::<tables::StorageChangeSets>(
85 BlockNumberAddress::range(range),
86 &mut limiter,
87 |_| false,
88 |(BlockNumberAddress((block_number, address)), entry)| {
89 highest_deleted_storages.insert((address, entry.key), block_number);
90 last_changeset_pruned_block = Some(block_number);
91 },
92 )?;
93 trace!(target: "pruner", deleted = %pruned_changesets, %done, "Pruned storage history (changesets)");
94
95 let last_changeset_pruned_block = last_changeset_pruned_block
96 .map(|block_number| if done { block_number } else { block_number.saturating_sub(1) })
99 .unwrap_or(range_end);
100
101 let highest_sharded_keys = highest_deleted_storages
105 .into_iter()
106 .sorted_unstable() .map(|((address, storage_key), block_number)| {
108 StorageShardedKey::new(
109 address,
110 storage_key,
111 block_number.min(last_changeset_pruned_block),
112 )
113 });
114 let outcomes = prune_history_indices::<Provider, tables::StoragesHistory, _>(
115 provider,
116 highest_sharded_keys,
117 |a, b| a.address == b.address && a.sharded_key.key == b.sharded_key.key,
118 )?;
119 trace!(target: "pruner", ?outcomes, %done, "Pruned storage history (indices)");
120
121 let progress = limiter.progress(done);
122
123 Ok(SegmentOutput {
124 progress,
125 pruned: pruned_changesets + outcomes.deleted,
126 checkpoint: Some(SegmentOutputCheckpoint {
127 block_number: Some(last_changeset_pruned_block),
128 tx_number: None,
129 }),
130 })
131 }
132}
133
134#[cfg(test)]
135mod tests {
136 use crate::segments::{
137 user::storage_history::STORAGE_HISTORY_TABLES_TO_PRUNE, PruneInput, PruneLimiter, Segment,
138 SegmentOutput, StorageHistory,
139 };
140 use alloy_primitives::{BlockNumber, B256};
141 use assert_matches::assert_matches;
142 use reth_db_api::{tables, BlockNumberList};
143 use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader};
144 use reth_prune_types::{PruneCheckpoint, PruneMode, PruneProgress, PruneSegment};
145 use reth_stages::test_utils::{StorageKind, TestStageDB};
146 use reth_testing_utils::generators::{
147 self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
148 };
149 use std::{collections::BTreeMap, ops::AddAssign};
150
151 #[test]
152 fn prune() {
153 let db = TestStageDB::default();
154 let mut rng = generators::rng();
155
156 let blocks = random_block_range(
157 &mut rng,
158 0..=5000,
159 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
160 );
161 db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
162
163 let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
164
165 let (changesets, _) = random_changeset_range(
166 &mut rng,
167 blocks.iter(),
168 accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
169 1..2,
170 1..2,
171 );
172 db.insert_changesets(changesets.clone(), None).expect("insert changesets");
173 db.insert_history(changesets.clone(), None).expect("insert history");
174
175 let storage_occurrences = db.table::<tables::StoragesHistory>().unwrap().into_iter().fold(
176 BTreeMap::<_, usize>::new(),
177 |mut map, (key, _)| {
178 map.entry((key.address, key.sharded_key.key)).or_default().add_assign(1);
179 map
180 },
181 );
182 assert!(storage_occurrences.into_iter().any(|(_, occurrences)| occurrences > 1));
183
184 assert_eq!(
185 db.table::<tables::StorageChangeSets>().unwrap().len(),
186 changesets.iter().flatten().flat_map(|(_, _, entries)| entries).count()
187 );
188
189 let original_shards = db.table::<tables::StoragesHistory>().unwrap();
190
191 let test_prune = |to_block: BlockNumber,
192 run: usize,
193 expected_result: (PruneProgress, usize)| {
194 let prune_mode = PruneMode::Before(to_block);
195 let deleted_entries_limit = 1000;
196 let mut limiter =
197 PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
198 let input = PruneInput {
199 previous_checkpoint: db
200 .factory
201 .provider()
202 .unwrap()
203 .get_prune_checkpoint(PruneSegment::StorageHistory)
204 .unwrap(),
205 to_block,
206 limiter: limiter.clone(),
207 };
208 let segment = StorageHistory::new(prune_mode);
209
210 let provider = db.factory.database_provider_rw().unwrap();
211 let result = segment.prune(&provider, input).unwrap();
212 limiter.increment_deleted_entries_count_by(result.pruned);
213
214 assert_matches!(
215 result,
216 SegmentOutput {progress, pruned, checkpoint: Some(_)}
217 if (progress, pruned) == expected_result
218 );
219
220 segment
221 .save_checkpoint(
222 &provider,
223 result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
224 )
225 .unwrap();
226 provider.commit().expect("commit");
227
228 let changesets = changesets
229 .iter()
230 .enumerate()
231 .flat_map(|(block_number, changeset)| {
232 changeset.iter().flat_map(move |(address, _, entries)| {
233 entries.iter().map(move |entry| (block_number, address, entry))
234 })
235 })
236 .collect::<Vec<_>>();
237
238 #[expect(clippy::skip_while_next)]
239 let pruned = changesets
240 .iter()
241 .enumerate()
242 .skip_while(|(i, (block_number, _, _))| {
243 *i < deleted_entries_limit / STORAGE_HISTORY_TABLES_TO_PRUNE * run &&
244 *block_number <= to_block as usize
245 })
246 .next()
247 .map(|(i, _)| i)
248 .unwrap_or_default();
249
250 let mut pruned_changesets = changesets
251 .iter()
252 .skip(pruned.saturating_sub(1));
255
256 let last_pruned_block_number = pruned_changesets
257 .next()
258 .map(|(block_number, _, _)| if result.progress.is_finished() {
259 *block_number
260 } else {
261 block_number.saturating_sub(1)
262 } as BlockNumber)
263 .unwrap_or(to_block);
264
265 let pruned_changesets = pruned_changesets.fold(
266 BTreeMap::<_, Vec<_>>::new(),
267 |mut acc, (block_number, address, entry)| {
268 acc.entry((block_number, address)).or_default().push(entry);
269 acc
270 },
271 );
272
273 assert_eq!(
274 db.table::<tables::StorageChangeSets>().unwrap().len(),
275 pruned_changesets.values().flatten().count()
276 );
277
278 let actual_shards = db.table::<tables::StoragesHistory>().unwrap();
279
280 let expected_shards = original_shards
281 .iter()
282 .filter(|(key, _)| key.sharded_key.highest_block_number > last_pruned_block_number)
283 .map(|(key, blocks)| {
284 let new_blocks =
285 blocks.iter().skip_while(|block| *block <= last_pruned_block_number);
286 (key.clone(), BlockNumberList::new_pre_sorted(new_blocks))
287 })
288 .collect::<Vec<_>>();
289
290 assert_eq!(actual_shards, expected_shards);
291
292 assert_eq!(
293 db.factory
294 .provider()
295 .unwrap()
296 .get_prune_checkpoint(PruneSegment::StorageHistory)
297 .unwrap(),
298 Some(PruneCheckpoint {
299 block_number: Some(last_pruned_block_number),
300 tx_number: None,
301 prune_mode
302 })
303 );
304 };
305
306 test_prune(
307 998,
308 1,
309 (
310 PruneProgress::HasMoreData(
311 reth_prune_types::PruneInterruptReason::DeletedEntriesLimitReached,
312 ),
313 500,
314 ),
315 );
316 test_prune(998, 2, (PruneProgress::Finished, 499));
317 test_prune(1200, 3, (PruneProgress::Finished, 202));
318 }
319}