reth_prune/
db_ext.rs

1use crate::PruneLimiter;
2use reth_db_api::{
3    cursor::{DbCursorRO, DbCursorRW, RangeWalker},
4    table::{Table, TableRow},
5    transaction::DbTxMut,
6    DatabaseError,
7};
8use std::{fmt::Debug, ops::RangeBounds};
9use tracing::debug;
10
11pub(crate) trait DbTxPruneExt: DbTxMut {
12    /// Prune the table for the specified pre-sorted key iterator.
13    ///
14    /// Returns number of rows pruned.
15    fn prune_table_with_iterator<T: Table>(
16        &self,
17        keys: impl IntoIterator<Item = T::Key>,
18        limiter: &mut PruneLimiter,
19        mut delete_callback: impl FnMut(TableRow<T>),
20    ) -> Result<(usize, bool), DatabaseError> {
21        let mut cursor = self.cursor_write::<T>()?;
22        let mut keys = keys.into_iter();
23
24        let mut deleted_entries = 0;
25
26        for key in &mut keys {
27            if limiter.is_limit_reached() {
28                debug!(
29                    target: "providers::db",
30                    ?limiter,
31                    deleted_entries_limit = %limiter.is_deleted_entries_limit_reached(),
32                    time_limit = %limiter.is_time_limit_reached(),
33                    table = %T::NAME,
34                    "Pruning limit reached"
35                );
36                break
37            }
38
39            let row = cursor.seek_exact(key)?;
40            if let Some(row) = row {
41                cursor.delete_current()?;
42                limiter.increment_deleted_entries_count();
43                deleted_entries += 1;
44                delete_callback(row);
45            }
46        }
47
48        let done = keys.next().is_none();
49        Ok((deleted_entries, done))
50    }
51
52    /// Prune the table for the specified key range.
53    ///
54    /// Returns number of rows pruned.
55    fn prune_table_with_range<T: Table>(
56        &self,
57        keys: impl RangeBounds<T::Key> + Clone + Debug,
58        limiter: &mut PruneLimiter,
59        mut skip_filter: impl FnMut(&TableRow<T>) -> bool,
60        mut delete_callback: impl FnMut(TableRow<T>),
61    ) -> Result<(usize, bool), DatabaseError> {
62        let mut cursor = self.cursor_write::<T>()?;
63        let mut walker = cursor.walk_range(keys)?;
64
65        let mut deleted_entries = 0;
66
67        let done = loop {
68            // check for time out must be done in this scope since it's not done in
69            // `prune_table_with_range_step`
70            if limiter.is_limit_reached() {
71                debug!(
72                    target: "providers::db",
73                    ?limiter,
74                    deleted_entries_limit = %limiter.is_deleted_entries_limit_reached(),
75                    time_limit = %limiter.is_time_limit_reached(),
76                    table = %T::NAME,
77                    "Pruning limit reached"
78                );
79                break false
80            }
81
82            let done = self.prune_table_with_range_step(
83                &mut walker,
84                limiter,
85                &mut skip_filter,
86                &mut delete_callback,
87            )?;
88
89            if done {
90                break true
91            }
92            deleted_entries += 1;
93        };
94
95        Ok((deleted_entries, done))
96    }
97
98    /// Steps once with the given walker and prunes the entry in the table.
99    ///
100    /// Returns `true` if the walker is finished, `false` if it may have more data to prune.
101    ///
102    /// CAUTION: Pruner limits are not checked. This allows for a clean exit of a prune run that's
103    /// pruning different tables concurrently, by letting them step to the same height before
104    /// timing out.
105    fn prune_table_with_range_step<T: Table>(
106        &self,
107        walker: &mut RangeWalker<'_, T, Self::CursorMut<T>>,
108        limiter: &mut PruneLimiter,
109        skip_filter: &mut impl FnMut(&TableRow<T>) -> bool,
110        delete_callback: &mut impl FnMut(TableRow<T>),
111    ) -> Result<bool, DatabaseError> {
112        let Some(res) = walker.next() else { return Ok(true) };
113
114        let row = res?;
115
116        if !skip_filter(&row) {
117            walker.delete_current()?;
118            limiter.increment_deleted_entries_count();
119            delete_callback(row);
120        }
121
122        Ok(false)
123    }
124}
125
126impl<Tx> DbTxPruneExt for Tx where Tx: DbTxMut {}