reth_prune/segments/user/
sender_recovery.rs

1use crate::{
2    db_ext::DbTxPruneExt,
3    segments::{PruneInput, Segment},
4    PrunerError,
5};
6use reth_db_api::{tables, transaction::DbTxMut};
7use reth_provider::{BlockReader, DBProvider, TransactionsProvider};
8use reth_prune_types::{
9    PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
10};
11use tracing::{instrument, trace};
12
13#[derive(Debug)]
14pub struct SenderRecovery {
15    mode: PruneMode,
16}
17
18impl SenderRecovery {
19    pub const fn new(mode: PruneMode) -> Self {
20        Self { mode }
21    }
22}
23
24impl<Provider> Segment<Provider> for SenderRecovery
25where
26    Provider: DBProvider<Tx: DbTxMut> + TransactionsProvider + BlockReader,
27{
28    fn segment(&self) -> PruneSegment {
29        PruneSegment::SenderRecovery
30    }
31
32    fn mode(&self) -> Option<PruneMode> {
33        Some(self.mode)
34    }
35
36    fn purpose(&self) -> PrunePurpose {
37        PrunePurpose::User
38    }
39
40    #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
41    fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
42        let tx_range = match input.get_next_tx_num_range(provider)? {
43            Some(range) => range,
44            None => {
45                trace!(target: "pruner", "No transaction senders to prune");
46                return Ok(SegmentOutput::done())
47            }
48        };
49        let tx_range_end = *tx_range.end();
50
51        let mut limiter = input.limiter;
52
53        let mut last_pruned_transaction = tx_range_end;
54        let (pruned, done) =
55            provider.tx_ref().prune_table_with_range::<tables::TransactionSenders>(
56                tx_range,
57                &mut limiter,
58                |_| false,
59                |row| last_pruned_transaction = row.0,
60            )?;
61        trace!(target: "pruner", %pruned, %done, "Pruned transaction senders");
62
63        let last_pruned_block = provider
64            .transaction_block(last_pruned_transaction)?
65            .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
66            // If there's more transaction senders to prune, set the checkpoint block number to
67            // previous, so we could finish pruning its transaction senders on the next run.
68            .checked_sub(if done { 0 } else { 1 });
69
70        let progress = limiter.progress(done);
71
72        Ok(SegmentOutput {
73            progress,
74            pruned,
75            checkpoint: Some(SegmentOutputCheckpoint {
76                block_number: last_pruned_block,
77                tx_number: Some(last_pruned_transaction),
78            }),
79        })
80    }
81}
82
83#[cfg(test)]
84mod tests {
85    use crate::segments::{PruneInput, PruneLimiter, Segment, SegmentOutput, SenderRecovery};
86    use alloy_primitives::{BlockNumber, TxNumber, B256};
87    use assert_matches::assert_matches;
88    use itertools::{
89        FoldWhile::{Continue, Done},
90        Itertools,
91    };
92    use reth_db_api::tables;
93    use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader};
94    use reth_prune_types::{PruneCheckpoint, PruneMode, PruneProgress, PruneSegment};
95    use reth_stages::test_utils::{StorageKind, TestStageDB};
96    use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
97    use std::ops::Sub;
98
99    #[test]
100    fn prune() {
101        let db = TestStageDB::default();
102        let mut rng = generators::rng();
103
104        let blocks = random_block_range(
105            &mut rng,
106            1..=10,
107            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
108        );
109        db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
110
111        let mut transaction_senders = Vec::new();
112        for block in &blocks {
113            transaction_senders.reserve_exact(block.transaction_count());
114            for transaction in &block.body().transactions {
115                transaction_senders.push((
116                    transaction_senders.len() as u64,
117                    transaction.recover_signer().expect("recover signer"),
118                ));
119            }
120        }
121        let transaction_senders_len = transaction_senders.len();
122        db.insert_transaction_senders(transaction_senders).expect("insert transaction senders");
123
124        assert_eq!(
125            db.table::<tables::Transactions>().unwrap().len(),
126            blocks.iter().map(|block| block.transaction_count()).sum::<usize>()
127        );
128        assert_eq!(
129            db.table::<tables::Transactions>().unwrap().len(),
130            db.table::<tables::TransactionSenders>().unwrap().len()
131        );
132
133        let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| {
134            let prune_mode = PruneMode::Before(to_block);
135            let segment = SenderRecovery::new(prune_mode);
136            let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10);
137            let input = PruneInput {
138                previous_checkpoint: db
139                    .factory
140                    .provider()
141                    .unwrap()
142                    .get_prune_checkpoint(PruneSegment::SenderRecovery)
143                    .unwrap(),
144                to_block,
145                limiter: limiter.clone(),
146            };
147
148            let next_tx_number_to_prune = db
149                .factory
150                .provider()
151                .unwrap()
152                .get_prune_checkpoint(PruneSegment::SenderRecovery)
153                .unwrap()
154                .and_then(|checkpoint| checkpoint.tx_number)
155                .map(|tx_number| tx_number + 1)
156                .unwrap_or_default();
157
158            let last_pruned_tx_number = blocks
159                .iter()
160                .take(to_block as usize)
161                .map(|block| block.transaction_count())
162                .sum::<usize>()
163                .min(
164                    next_tx_number_to_prune as usize +
165                        input.limiter.deleted_entries_limit().unwrap(),
166                )
167                .sub(1);
168
169            let last_pruned_block_number = blocks
170                .iter()
171                .fold_while((0, 0), |(_, mut tx_count), block| {
172                    tx_count += block.transaction_count();
173
174                    if tx_count > last_pruned_tx_number {
175                        Done((block.number, tx_count))
176                    } else {
177                        Continue((block.number, tx_count))
178                    }
179                })
180                .into_inner()
181                .0;
182
183            let provider = db.factory.database_provider_rw().unwrap();
184            let result = segment.prune(&provider, input).unwrap();
185            limiter.increment_deleted_entries_count_by(result.pruned);
186
187            assert_matches!(
188                result,
189                SegmentOutput {progress, pruned, checkpoint: Some(_)}
190                    if (progress, pruned) == expected_result
191            );
192
193            segment
194                .save_checkpoint(
195                    &provider,
196                    result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
197                )
198                .unwrap();
199            provider.commit().expect("commit");
200
201            let last_pruned_block_number = last_pruned_block_number
202                .checked_sub(if result.progress.is_finished() { 0 } else { 1 });
203
204            assert_eq!(
205                db.table::<tables::TransactionSenders>().unwrap().len(),
206                transaction_senders_len - (last_pruned_tx_number + 1)
207            );
208            assert_eq!(
209                db.factory
210                    .provider()
211                    .unwrap()
212                    .get_prune_checkpoint(PruneSegment::SenderRecovery)
213                    .unwrap(),
214                Some(PruneCheckpoint {
215                    block_number: last_pruned_block_number,
216                    tx_number: Some(last_pruned_tx_number as TxNumber),
217                    prune_mode
218                })
219            );
220        };
221
222        test_prune(
223            6,
224            (
225                PruneProgress::HasMoreData(
226                    reth_prune_types::PruneInterruptReason::DeletedEntriesLimitReached,
227                ),
228                10,
229            ),
230        );
231        test_prune(6, (PruneProgress::Finished, 2));
232        test_prune(10, (PruneProgress::Finished, 8));
233    }
234}