reth_prune/segments/user/
sender_recovery.rs

1use crate::{
2    db_ext::DbTxPruneExt,
3    segments::{PruneInput, Segment},
4    PrunerError,
5};
6use reth_db_api::{tables, transaction::DbTxMut};
7use reth_provider::{BlockReader, DBProvider, TransactionsProvider};
8use reth_prune_types::{
9    PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
10};
11use tracing::{instrument, trace};
12
13#[derive(Debug)]
14pub struct SenderRecovery {
15    mode: PruneMode,
16}
17
18impl SenderRecovery {
19    pub const fn new(mode: PruneMode) -> Self {
20        Self { mode }
21    }
22}
23
24impl<Provider> Segment<Provider> for SenderRecovery
25where
26    Provider: DBProvider<Tx: DbTxMut> + TransactionsProvider + BlockReader,
27{
28    fn segment(&self) -> PruneSegment {
29        PruneSegment::SenderRecovery
30    }
31
32    fn mode(&self) -> Option<PruneMode> {
33        Some(self.mode)
34    }
35
36    fn purpose(&self) -> PrunePurpose {
37        PrunePurpose::User
38    }
39
40    #[instrument(target = "pruner", skip(self, provider), ret(level = "trace"))]
41    fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
42        let tx_range = match input.get_next_tx_num_range(provider)? {
43            Some(range) => range,
44            None => {
45                trace!(target: "pruner", "No transaction senders to prune");
46                return Ok(SegmentOutput::done())
47            }
48        };
49        let tx_range_end = *tx_range.end();
50
51        // For PruneMode::Full, clear the entire table in one operation
52        if self.mode.is_full() {
53            let pruned = provider.tx_ref().clear_table::<tables::TransactionSenders>()?;
54            trace!(target: "pruner", %pruned, "Cleared transaction senders table");
55
56            let last_pruned_block = provider
57                .block_by_transaction_id(tx_range_end)?
58                .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?;
59
60            return Ok(SegmentOutput {
61                progress: PruneProgress::Finished,
62                pruned,
63                checkpoint: Some(SegmentOutputCheckpoint {
64                    block_number: Some(last_pruned_block),
65                    tx_number: Some(tx_range_end),
66                }),
67            });
68        }
69
70        let mut limiter = input.limiter;
71
72        let mut last_pruned_transaction = tx_range_end;
73        let (pruned, done) =
74            provider.tx_ref().prune_table_with_range::<tables::TransactionSenders>(
75                tx_range,
76                &mut limiter,
77                |_| false,
78                |row| last_pruned_transaction = row.0,
79            )?;
80        trace!(target: "pruner", %pruned, %done, "Pruned transaction senders");
81
82        let last_pruned_block = provider
83            .block_by_transaction_id(last_pruned_transaction)?
84            .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
85            // If there's more transaction senders to prune, set the checkpoint block number to
86            // previous, so we could finish pruning its transaction senders on the next run.
87            .checked_sub(if done { 0 } else { 1 });
88
89        let progress = limiter.progress(done);
90
91        Ok(SegmentOutput {
92            progress,
93            pruned,
94            checkpoint: Some(SegmentOutputCheckpoint {
95                block_number: last_pruned_block,
96                tx_number: Some(last_pruned_transaction),
97            }),
98        })
99    }
100}
101
102#[cfg(test)]
103mod tests {
104    use crate::segments::{PruneInput, PruneLimiter, Segment, SegmentOutput, SenderRecovery};
105    use alloy_primitives::{BlockNumber, TxNumber, B256};
106    use assert_matches::assert_matches;
107    use itertools::{
108        FoldWhile::{Continue, Done},
109        Itertools,
110    };
111    use reth_db_api::tables;
112    use reth_primitives_traits::SignerRecoverable;
113    use reth_provider::{DBProvider, DatabaseProviderFactory, PruneCheckpointReader};
114    use reth_prune_types::{PruneCheckpoint, PruneMode, PruneProgress, PruneSegment};
115    use reth_stages::test_utils::{StorageKind, TestStageDB};
116    use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
117    use std::ops::Sub;
118
119    #[test]
120    fn prune() {
121        let db = TestStageDB::default();
122        let mut rng = generators::rng();
123
124        let blocks = random_block_range(
125            &mut rng,
126            1..=10,
127            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
128        );
129        db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
130
131        let mut transaction_senders = Vec::new();
132        for block in &blocks {
133            transaction_senders.reserve_exact(block.transaction_count());
134            for transaction in &block.body().transactions {
135                transaction_senders.push((
136                    transaction_senders.len() as u64,
137                    transaction.recover_signer().expect("recover signer"),
138                ));
139            }
140        }
141        let transaction_senders_len = transaction_senders.len();
142        db.insert_transaction_senders(transaction_senders).expect("insert transaction senders");
143
144        assert_eq!(
145            db.table::<tables::Transactions>().unwrap().len(),
146            blocks.iter().map(|block| block.transaction_count()).sum::<usize>()
147        );
148        assert_eq!(
149            db.table::<tables::Transactions>().unwrap().len(),
150            db.table::<tables::TransactionSenders>().unwrap().len()
151        );
152
153        let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| {
154            let prune_mode = PruneMode::Before(to_block);
155            let segment = SenderRecovery::new(prune_mode);
156            let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10);
157            let input = PruneInput {
158                previous_checkpoint: db
159                    .factory
160                    .provider()
161                    .unwrap()
162                    .get_prune_checkpoint(PruneSegment::SenderRecovery)
163                    .unwrap(),
164                to_block,
165                limiter: limiter.clone(),
166            };
167
168            let next_tx_number_to_prune = db
169                .factory
170                .provider()
171                .unwrap()
172                .get_prune_checkpoint(PruneSegment::SenderRecovery)
173                .unwrap()
174                .and_then(|checkpoint| checkpoint.tx_number)
175                .map(|tx_number| tx_number + 1)
176                .unwrap_or_default();
177
178            let last_pruned_tx_number = blocks
179                .iter()
180                .take(to_block as usize)
181                .map(|block| block.transaction_count())
182                .sum::<usize>()
183                .min(
184                    next_tx_number_to_prune as usize +
185                        input.limiter.deleted_entries_limit().unwrap(),
186                )
187                .sub(1);
188
189            let last_pruned_block_number = blocks
190                .iter()
191                .fold_while((0, 0), |(_, mut tx_count), block| {
192                    tx_count += block.transaction_count();
193
194                    if tx_count > last_pruned_tx_number {
195                        Done((block.number, tx_count))
196                    } else {
197                        Continue((block.number, tx_count))
198                    }
199                })
200                .into_inner()
201                .0;
202
203            let provider = db.factory.database_provider_rw().unwrap();
204            let result = segment.prune(&provider, input).unwrap();
205            limiter.increment_deleted_entries_count_by(result.pruned);
206
207            assert_matches!(
208                result,
209                SegmentOutput {progress, pruned, checkpoint: Some(_)}
210                    if (progress, pruned) == expected_result
211            );
212
213            segment
214                .save_checkpoint(
215                    &provider,
216                    result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
217                )
218                .unwrap();
219            provider.commit().expect("commit");
220
221            let last_pruned_block_number = last_pruned_block_number
222                .checked_sub(if result.progress.is_finished() { 0 } else { 1 });
223
224            assert_eq!(
225                db.table::<tables::TransactionSenders>().unwrap().len(),
226                transaction_senders_len - (last_pruned_tx_number + 1)
227            );
228            assert_eq!(
229                db.factory
230                    .provider()
231                    .unwrap()
232                    .get_prune_checkpoint(PruneSegment::SenderRecovery)
233                    .unwrap(),
234                Some(PruneCheckpoint {
235                    block_number: last_pruned_block_number,
236                    tx_number: Some(last_pruned_tx_number as TxNumber),
237                    prune_mode
238                })
239            );
240        };
241
242        test_prune(
243            6,
244            (
245                PruneProgress::HasMoreData(
246                    reth_prune_types::PruneInterruptReason::DeletedEntriesLimitReached,
247                ),
248                10,
249            ),
250        );
251        test_prune(6, (PruneProgress::Finished, 2));
252        test_prune(10, (PruneProgress::Finished, 8));
253    }
254}