1use crate::PruneLimiter;
2use reth_db_api::{
3 cursor::{DbCursorRO, DbCursorRW, RangeWalker},
4 table::{DupSort, Table, TableRow},
5 transaction::{DbTx, DbTxMut},
6 DatabaseError,
7};
8use std::{fmt::Debug, ops::RangeBounds};
9use tracing::debug;
10
11pub(crate) trait DbTxPruneExt: DbTxMut + DbTx {
12 fn clear_table<T: Table>(&self) -> Result<usize, DatabaseError> {
17 let count = self.entries::<T>()?;
18 <Self as DbTxMut>::clear::<T>(self)?;
19 Ok(count)
20 }
21
22 fn prune_table_with_iterator<T: Table>(
26 &self,
27 keys: impl IntoIterator<Item = T::Key>,
28 limiter: &mut PruneLimiter,
29 mut delete_callback: impl FnMut(TableRow<T>),
30 ) -> Result<(usize, bool), DatabaseError> {
31 let mut cursor = self.cursor_write::<T>()?;
32 let mut keys = keys.into_iter().peekable();
33
34 let mut deleted_entries = 0;
35
36 let mut done = true;
37 while keys.peek().is_some() {
38 if limiter.is_limit_reached() {
39 debug!(
40 target: "providers::db",
41 ?limiter,
42 deleted_entries_limit = %limiter.is_deleted_entries_limit_reached(),
43 time_limit = %limiter.is_time_limit_reached(),
44 table = %T::NAME,
45 "Pruning limit reached"
46 );
47 done = false;
48 break
49 }
50
51 let key = keys.next().expect("peek() said Some");
52 let row = cursor.seek_exact(key)?;
53 if let Some(row) = row {
54 cursor.delete_current()?;
55 limiter.increment_deleted_entries_count();
56 deleted_entries += 1;
57 delete_callback(row);
58 }
59 }
60
61 Ok((deleted_entries, done))
62 }
63
64 fn prune_table_with_range<T: Table>(
68 &self,
69 keys: impl RangeBounds<T::Key> + Clone + Debug,
70 limiter: &mut PruneLimiter,
71 mut skip_filter: impl FnMut(&TableRow<T>) -> bool,
72 mut delete_callback: impl FnMut(TableRow<T>),
73 ) -> Result<(usize, bool), DatabaseError> {
74 let mut cursor = self.cursor_write::<T>()?;
75 let mut walker = cursor.walk_range(keys)?;
76
77 let mut deleted_entries = 0;
78
79 let done = loop {
80 if limiter.is_limit_reached() {
83 debug!(
84 target: "providers::db",
85 ?limiter,
86 deleted_entries_limit = %limiter.is_deleted_entries_limit_reached(),
87 time_limit = %limiter.is_time_limit_reached(),
88 table = %T::NAME,
89 "Pruning limit reached"
90 );
91 break false
92 }
93
94 let done = self.prune_table_with_range_step(
95 &mut walker,
96 limiter,
97 &mut skip_filter,
98 &mut delete_callback,
99 )?;
100
101 if done {
102 break true
103 }
104 deleted_entries += 1;
105 };
106
107 Ok((deleted_entries, done))
108 }
109
110 fn prune_table_with_range_step<T: Table>(
118 &self,
119 walker: &mut RangeWalker<'_, T, Self::CursorMut<T>>,
120 limiter: &mut PruneLimiter,
121 skip_filter: &mut impl FnMut(&TableRow<T>) -> bool,
122 delete_callback: &mut impl FnMut(TableRow<T>),
123 ) -> Result<bool, DatabaseError> {
124 let Some(res) = walker.next() else { return Ok(true) };
125
126 let row = res?;
127
128 if !skip_filter(&row) {
129 walker.delete_current()?;
130 limiter.increment_deleted_entries_count();
131 delete_callback(row);
132 }
133
134 Ok(false)
135 }
136
137 #[expect(unused)]
141 fn prune_dupsort_table_with_range<T: DupSort>(
142 &self,
143 keys: impl RangeBounds<T::Key> + Clone + Debug,
144 limiter: &mut PruneLimiter,
145 mut delete_callback: impl FnMut(TableRow<T>),
146 ) -> Result<(usize, bool), DatabaseError> {
147 let starting_entries = self.entries::<T>()?;
148 let mut cursor = self.cursor_dup_write::<T>()?;
149 let mut walker = cursor.walk_range(keys)?;
150
151 let done = loop {
152 if limiter.is_limit_reached() {
153 debug!(
154 target: "providers::db",
155 ?limiter,
156 deleted_entries_limit = %limiter.is_deleted_entries_limit_reached(),
157 time_limit = %limiter.is_time_limit_reached(),
158 table = %T::NAME,
159 "Pruning limit reached"
160 );
161 break false
162 }
163
164 let Some(res) = walker.next() else { break true };
165 let row = res?;
166
167 walker.delete_current_duplicates()?;
168 limiter.increment_deleted_entries_count();
169 delete_callback(row);
170 };
171
172 debug!(
173 target: "providers::db",
174 table=?T::NAME,
175 cursor_current=?cursor.current(),
176 "done walking",
177 );
178
179 let ending_entries = self.entries::<T>()?;
180
181 Ok((starting_entries - ending_entries, done))
182 }
183}
184
185impl<Tx> DbTxPruneExt for Tx where Tx: DbTxMut + DbTx {}
186
187#[cfg(test)]
188mod tests {
189 use super::DbTxPruneExt;
190 use crate::PruneLimiter;
191 use reth_db_api::tables;
192 use reth_primitives_traits::SignerRecoverable;
193 use reth_provider::{DBProvider, DatabaseProviderFactory};
194 use reth_stages::test_utils::{StorageKind, TestStageDB};
195 use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
196 use std::sync::{
197 atomic::{AtomicUsize, Ordering},
198 Arc,
199 };
200
201 struct CountingIter {
202 data: Vec<u64>,
203 calls: Arc<AtomicUsize>,
204 }
205
206 impl CountingIter {
207 fn new(data: Vec<u64>, calls: Arc<AtomicUsize>) -> Self {
208 Self { data, calls }
209 }
210 }
211
212 struct CountingIntoIter {
213 inner: std::vec::IntoIter<u64>,
214 calls: Arc<AtomicUsize>,
215 }
216
217 impl Iterator for CountingIntoIter {
218 type Item = u64;
219 fn next(&mut self) -> Option<Self::Item> {
220 let res = self.inner.next();
221 self.calls.fetch_add(1, Ordering::SeqCst);
222 res
223 }
224 }
225
226 impl IntoIterator for CountingIter {
227 type Item = u64;
228 type IntoIter = CountingIntoIter;
229 fn into_iter(self) -> Self::IntoIter {
230 CountingIntoIter { inner: self.data.into_iter(), calls: self.calls }
231 }
232 }
233
234 #[test]
235 fn prune_table_with_iterator_early_exit_does_not_overconsume() {
236 let db = TestStageDB::default();
237 let mut rng = generators::rng();
238
239 let blocks = random_block_range(
240 &mut rng,
241 1..=3,
242 BlockRangeParams {
243 parent: Some(alloy_primitives::B256::ZERO),
244 tx_count: 2..3,
245 ..Default::default()
246 },
247 );
248 db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
249
250 let mut tx_senders = Vec::new();
251 for block in &blocks {
252 tx_senders.reserve_exact(block.transaction_count());
253 for transaction in &block.body().transactions {
254 tx_senders.push((
255 tx_senders.len() as u64,
256 transaction.recover_signer().expect("recover signer"),
257 ));
258 }
259 }
260 let total = tx_senders.len();
261 db.insert_transaction_senders(tx_senders).expect("insert transaction senders");
262
263 let provider = db.factory.database_provider_rw().unwrap();
264
265 let calls = Arc::new(AtomicUsize::new(0));
266 let keys: Vec<u64> = (0..total as u64).collect();
267 let counting_iter = CountingIter::new(keys, calls.clone());
268
269 let mut limiter = PruneLimiter::default().set_deleted_entries_limit(2);
270
271 let (pruned, done) = provider
272 .tx_ref()
273 .prune_table_with_iterator::<tables::TransactionSenders>(
274 counting_iter,
275 &mut limiter,
276 |_| {},
277 )
278 .expect("prune");
279
280 assert_eq!(pruned, 2);
281 assert!(!done);
282 assert_eq!(calls.load(Ordering::SeqCst), pruned + 1);
283
284 provider.commit().expect("commit");
285 assert_eq!(db.table::<tables::TransactionSenders>().unwrap().len(), total - 2);
286 }
287
288 #[test]
289 fn prune_table_with_iterator_consumes_to_end_reports_done() {
290 let db = TestStageDB::default();
291 let mut rng = generators::rng();
292
293 let blocks = random_block_range(
294 &mut rng,
295 1..=2,
296 BlockRangeParams {
297 parent: Some(alloy_primitives::B256::ZERO),
298 tx_count: 1..2,
299 ..Default::default()
300 },
301 );
302 db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
303
304 let mut tx_senders = Vec::new();
305 for block in &blocks {
306 for transaction in &block.body().transactions {
307 tx_senders.push((
308 tx_senders.len() as u64,
309 transaction.recover_signer().expect("recover signer"),
310 ));
311 }
312 }
313 let total = tx_senders.len();
314 db.insert_transaction_senders(tx_senders).expect("insert transaction senders");
315
316 let provider = db.factory.database_provider_rw().unwrap();
317
318 let calls = Arc::new(AtomicUsize::new(0));
319 let keys: Vec<u64> = (0..total as u64).collect();
320 let counting_iter = CountingIter::new(keys, calls.clone());
321
322 let mut limiter = PruneLimiter::default().set_deleted_entries_limit(usize::MAX);
323
324 let (pruned, done) = provider
325 .tx_ref()
326 .prune_table_with_iterator::<tables::TransactionSenders>(
327 counting_iter,
328 &mut limiter,
329 |_| {},
330 )
331 .expect("prune");
332
333 assert_eq!(pruned, total);
334 assert!(done);
335 assert_eq!(calls.load(Ordering::SeqCst), total + 1);
336
337 provider.commit().expect("commit");
338 assert_eq!(db.table::<tables::TransactionSenders>().unwrap().len(), 0);
339 }
340}