reth_prune/
db_ext.rs

1use crate::PruneLimiter;
2use reth_db_api::{
3    cursor::{DbCursorRO, DbCursorRW, RangeWalker},
4    table::{DupSort, Table, TableRow},
5    transaction::{DbTx, DbTxMut},
6    DatabaseError,
7};
8use std::{fmt::Debug, ops::RangeBounds};
9use tracing::debug;
10
11pub(crate) trait DbTxPruneExt: DbTxMut + DbTx {
12    /// Clear the entire table in a single operation.
13    ///
14    /// This is much faster than iterating entry-by-entry for `PruneMode::Full`.
15    /// Returns the number of entries that were in the table.
16    fn clear_table<T: Table>(&self) -> Result<usize, DatabaseError> {
17        let count = self.entries::<T>()?;
18        <Self as DbTxMut>::clear::<T>(self)?;
19        Ok(count)
20    }
21
22    /// Prune the table for the specified pre-sorted key iterator.
23    ///
24    /// Returns number of rows pruned.
25    fn prune_table_with_iterator<T: Table>(
26        &self,
27        keys: impl IntoIterator<Item = T::Key>,
28        limiter: &mut PruneLimiter,
29        mut delete_callback: impl FnMut(TableRow<T>),
30    ) -> Result<(usize, bool), DatabaseError> {
31        let mut cursor = self.cursor_write::<T>()?;
32        let mut keys = keys.into_iter().peekable();
33
34        let mut deleted_entries = 0;
35
36        let mut done = true;
37        while keys.peek().is_some() {
38            if limiter.is_limit_reached() {
39                debug!(
40                    target: "providers::db",
41                    ?limiter,
42                    deleted_entries_limit = %limiter.is_deleted_entries_limit_reached(),
43                    time_limit = %limiter.is_time_limit_reached(),
44                    table = %T::NAME,
45                    "Pruning limit reached"
46                );
47                done = false;
48                break
49            }
50
51            let key = keys.next().expect("peek() said Some");
52            let row = cursor.seek_exact(key)?;
53            if let Some(row) = row {
54                cursor.delete_current()?;
55                limiter.increment_deleted_entries_count();
56                deleted_entries += 1;
57                delete_callback(row);
58            }
59        }
60
61        Ok((deleted_entries, done))
62    }
63
64    /// Prune the table for the specified key range.
65    ///
66    /// Returns number of rows pruned.
67    fn prune_table_with_range<T: Table>(
68        &self,
69        keys: impl RangeBounds<T::Key> + Clone + Debug,
70        limiter: &mut PruneLimiter,
71        mut skip_filter: impl FnMut(&TableRow<T>) -> bool,
72        mut delete_callback: impl FnMut(TableRow<T>),
73    ) -> Result<(usize, bool), DatabaseError> {
74        let mut cursor = self.cursor_write::<T>()?;
75        let mut walker = cursor.walk_range(keys)?;
76
77        let mut deleted_entries = 0;
78
79        let done = loop {
80            // check for time out must be done in this scope since it's not done in
81            // `prune_table_with_range_step`
82            if limiter.is_limit_reached() {
83                debug!(
84                    target: "providers::db",
85                    ?limiter,
86                    deleted_entries_limit = %limiter.is_deleted_entries_limit_reached(),
87                    time_limit = %limiter.is_time_limit_reached(),
88                    table = %T::NAME,
89                    "Pruning limit reached"
90                );
91                break false
92            }
93
94            let done = self.prune_table_with_range_step(
95                &mut walker,
96                limiter,
97                &mut skip_filter,
98                &mut delete_callback,
99            )?;
100
101            if done {
102                break true
103            }
104            deleted_entries += 1;
105        };
106
107        Ok((deleted_entries, done))
108    }
109
110    /// Steps once with the given walker and prunes the entry in the table.
111    ///
112    /// Returns `true` if the walker is finished, `false` if it may have more data to prune.
113    ///
114    /// CAUTION: Pruner limits are not checked. This allows for a clean exit of a prune run that's
115    /// pruning different tables concurrently, by letting them step to the same height before
116    /// timing out.
117    fn prune_table_with_range_step<T: Table>(
118        &self,
119        walker: &mut RangeWalker<'_, T, Self::CursorMut<T>>,
120        limiter: &mut PruneLimiter,
121        skip_filter: &mut impl FnMut(&TableRow<T>) -> bool,
122        delete_callback: &mut impl FnMut(TableRow<T>),
123    ) -> Result<bool, DatabaseError> {
124        let Some(res) = walker.next() else { return Ok(true) };
125
126        let row = res?;
127
128        if !skip_filter(&row) {
129            walker.delete_current()?;
130            limiter.increment_deleted_entries_count();
131            delete_callback(row);
132        }
133
134        Ok(false)
135    }
136
137    /// Prune a DUPSORT table for the specified key range.
138    ///
139    /// Returns number of rows pruned.
140    #[expect(unused)]
141    fn prune_dupsort_table_with_range<T: DupSort>(
142        &self,
143        keys: impl RangeBounds<T::Key> + Clone + Debug,
144        limiter: &mut PruneLimiter,
145        mut delete_callback: impl FnMut(TableRow<T>),
146    ) -> Result<(usize, bool), DatabaseError> {
147        let starting_entries = self.entries::<T>()?;
148        let mut cursor = self.cursor_dup_write::<T>()?;
149        let mut walker = cursor.walk_range(keys)?;
150
151        let done = loop {
152            if limiter.is_limit_reached() {
153                debug!(
154                    target: "providers::db",
155                    ?limiter,
156                    deleted_entries_limit = %limiter.is_deleted_entries_limit_reached(),
157                    time_limit = %limiter.is_time_limit_reached(),
158                    table = %T::NAME,
159                    "Pruning limit reached"
160                );
161                break false
162            }
163
164            let Some(res) = walker.next() else { break true };
165            let row = res?;
166
167            walker.delete_current_duplicates()?;
168            limiter.increment_deleted_entries_count();
169            delete_callback(row);
170        };
171
172        debug!(
173            target: "providers::db",
174            table=?T::NAME,
175            cursor_current=?cursor.current(),
176            "done walking",
177        );
178
179        let ending_entries = self.entries::<T>()?;
180
181        Ok((starting_entries - ending_entries, done))
182    }
183}
184
185impl<Tx> DbTxPruneExt for Tx where Tx: DbTxMut + DbTx {}
186
187#[cfg(test)]
188mod tests {
189    use super::DbTxPruneExt;
190    use crate::PruneLimiter;
191    use reth_db_api::tables;
192    use reth_primitives_traits::SignerRecoverable;
193    use reth_provider::{DBProvider, DatabaseProviderFactory};
194    use reth_stages::test_utils::{StorageKind, TestStageDB};
195    use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
196    use std::sync::{
197        atomic::{AtomicUsize, Ordering},
198        Arc,
199    };
200
201    struct CountingIter {
202        data: Vec<u64>,
203        calls: Arc<AtomicUsize>,
204    }
205
206    impl CountingIter {
207        fn new(data: Vec<u64>, calls: Arc<AtomicUsize>) -> Self {
208            Self { data, calls }
209        }
210    }
211
212    struct CountingIntoIter {
213        inner: std::vec::IntoIter<u64>,
214        calls: Arc<AtomicUsize>,
215    }
216
217    impl Iterator for CountingIntoIter {
218        type Item = u64;
219        fn next(&mut self) -> Option<Self::Item> {
220            let res = self.inner.next();
221            self.calls.fetch_add(1, Ordering::SeqCst);
222            res
223        }
224    }
225
226    impl IntoIterator for CountingIter {
227        type Item = u64;
228        type IntoIter = CountingIntoIter;
229        fn into_iter(self) -> Self::IntoIter {
230            CountingIntoIter { inner: self.data.into_iter(), calls: self.calls }
231        }
232    }
233
234    #[test]
235    fn prune_table_with_iterator_early_exit_does_not_overconsume() {
236        let db = TestStageDB::default();
237        let mut rng = generators::rng();
238
239        let blocks = random_block_range(
240            &mut rng,
241            1..=3,
242            BlockRangeParams {
243                parent: Some(alloy_primitives::B256::ZERO),
244                tx_count: 2..3,
245                ..Default::default()
246            },
247        );
248        db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
249
250        let mut tx_senders = Vec::new();
251        for block in &blocks {
252            tx_senders.reserve_exact(block.transaction_count());
253            for transaction in &block.body().transactions {
254                tx_senders.push((
255                    tx_senders.len() as u64,
256                    transaction.recover_signer().expect("recover signer"),
257                ));
258            }
259        }
260        let total = tx_senders.len();
261        db.insert_transaction_senders(tx_senders).expect("insert transaction senders");
262
263        let provider = db.factory.database_provider_rw().unwrap();
264
265        let calls = Arc::new(AtomicUsize::new(0));
266        let keys: Vec<u64> = (0..total as u64).collect();
267        let counting_iter = CountingIter::new(keys, calls.clone());
268
269        let mut limiter = PruneLimiter::default().set_deleted_entries_limit(2);
270
271        let (pruned, done) = provider
272            .tx_ref()
273            .prune_table_with_iterator::<tables::TransactionSenders>(
274                counting_iter,
275                &mut limiter,
276                |_| {},
277            )
278            .expect("prune");
279
280        assert_eq!(pruned, 2);
281        assert!(!done);
282        assert_eq!(calls.load(Ordering::SeqCst), pruned + 1);
283
284        provider.commit().expect("commit");
285        assert_eq!(db.table::<tables::TransactionSenders>().unwrap().len(), total - 2);
286    }
287
288    #[test]
289    fn prune_table_with_iterator_consumes_to_end_reports_done() {
290        let db = TestStageDB::default();
291        let mut rng = generators::rng();
292
293        let blocks = random_block_range(
294            &mut rng,
295            1..=2,
296            BlockRangeParams {
297                parent: Some(alloy_primitives::B256::ZERO),
298                tx_count: 1..2,
299                ..Default::default()
300            },
301        );
302        db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
303
304        let mut tx_senders = Vec::new();
305        for block in &blocks {
306            for transaction in &block.body().transactions {
307                tx_senders.push((
308                    tx_senders.len() as u64,
309                    transaction.recover_signer().expect("recover signer"),
310                ));
311            }
312        }
313        let total = tx_senders.len();
314        db.insert_transaction_senders(tx_senders).expect("insert transaction senders");
315
316        let provider = db.factory.database_provider_rw().unwrap();
317
318        let calls = Arc::new(AtomicUsize::new(0));
319        let keys: Vec<u64> = (0..total as u64).collect();
320        let counting_iter = CountingIter::new(keys, calls.clone());
321
322        let mut limiter = PruneLimiter::default().set_deleted_entries_limit(usize::MAX);
323
324        let (pruned, done) = provider
325            .tx_ref()
326            .prune_table_with_iterator::<tables::TransactionSenders>(
327                counting_iter,
328                &mut limiter,
329                |_| {},
330            )
331            .expect("prune");
332
333        assert_eq!(pruned, total);
334        assert!(done);
335        assert_eq!(calls.load(Ordering::SeqCst), total + 1);
336
337        provider.commit().expect("commit");
338        assert_eq!(db.table::<tables::TransactionSenders>().unwrap().len(), 0);
339    }
340}