Skip to main content

reth_prune/segments/user/
sender_recovery.rs

1use crate::{
2    db_ext::DbTxPruneExt,
3    segments::{self, PruneInput, Segment},
4    PrunerError,
5};
6use reth_db_api::{tables, transaction::DbTxMut};
7use reth_provider::{
8    BlockReader, DBProvider, EitherWriterDestination, StaticFileProviderFactory,
9    StorageSettingsCache, TransactionsProvider,
10};
11use reth_prune_types::{
12    PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
13};
14use reth_static_file_types::StaticFileSegment;
15use tracing::{debug, instrument, trace};
16
17#[derive(Debug)]
18pub struct SenderRecovery {
19    mode: PruneMode,
20}
21
22impl SenderRecovery {
23    pub const fn new(mode: PruneMode) -> Self {
24        Self { mode }
25    }
26}
27
28impl<Provider> Segment<Provider> for SenderRecovery
29where
30    Provider: DBProvider<Tx: DbTxMut>
31        + TransactionsProvider
32        + BlockReader
33        + StorageSettingsCache
34        + StaticFileProviderFactory,
35{
36    fn segment(&self) -> PruneSegment {
37        PruneSegment::SenderRecovery
38    }
39
40    fn mode(&self) -> Option<PruneMode> {
41        Some(self.mode)
42    }
43
44    fn purpose(&self) -> PrunePurpose {
45        PrunePurpose::User
46    }
47
48    #[instrument(
49        name = "SenderRecovery::prune",
50        target = "pruner",
51        skip(self, provider),
52        ret(level = "trace")
53    )]
54    fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
55        if EitherWriterDestination::senders(provider).is_static_file() {
56            debug!(target: "pruner", "Pruning transaction senders from static files.");
57
58            if self.mode.is_full() {
59                debug!(target: "pruner", "PruneMode::Full: deleting all transaction senders static files.");
60                return segments::delete_static_files_segment(
61                    provider,
62                    input,
63                    StaticFileSegment::TransactionSenders,
64                )
65            }
66
67            return segments::prune_static_files(
68                provider,
69                input,
70                StaticFileSegment::TransactionSenders,
71            )
72        }
73        debug!(target: "pruner", "Pruning transaction senders from database.");
74
75        let tx_range = match input.get_next_tx_num_range(provider)? {
76            Some(range) => range,
77            None => {
78                trace!(target: "pruner", "No transaction senders to prune");
79                return Ok(SegmentOutput::done())
80            }
81        };
82        let tx_range_end = *tx_range.end();
83
84        // For PruneMode::Full, clear the entire table in one operation
85        if self.mode.is_full() {
86            let pruned = provider.tx_ref().clear_table::<tables::TransactionSenders>()?;
87            trace!(target: "pruner", %pruned, "Cleared transaction senders table");
88
89            let last_pruned_block = provider
90                .block_by_transaction_id(tx_range_end)?
91                .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?;
92
93            return Ok(SegmentOutput {
94                progress: PruneProgress::Finished,
95                pruned,
96                checkpoint: Some(SegmentOutputCheckpoint {
97                    block_number: Some(last_pruned_block),
98                    tx_number: Some(tx_range_end),
99                }),
100            });
101        }
102
103        let mut limiter = input.limiter;
104
105        let mut last_pruned_transaction = tx_range_end;
106        let (pruned, done) =
107            provider.tx_ref().prune_table_with_range::<tables::TransactionSenders>(
108                tx_range,
109                &mut limiter,
110                |_| false,
111                |row| last_pruned_transaction = row.0,
112            )?;
113        trace!(target: "pruner", %pruned, %done, "Pruned transaction senders");
114
115        let last_pruned_block = provider
116            .block_by_transaction_id(last_pruned_transaction)?
117            .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
118            // If there's more transaction senders to prune, set the checkpoint block number to
119            // previous, so we could finish pruning its transaction senders on the next run.
120            .checked_sub(if done { 0 } else { 1 });
121
122        let progress = limiter.progress(done);
123
124        Ok(SegmentOutput {
125            progress,
126            pruned,
127            checkpoint: Some(SegmentOutputCheckpoint {
128                block_number: last_pruned_block,
129                tx_number: Some(last_pruned_transaction),
130            }),
131        })
132    }
133}
134
135#[cfg(test)]
136mod tests {
137    use crate::segments::{PruneInput, PruneLimiter, Segment, SegmentOutput, SenderRecovery};
138    use alloy_primitives::{BlockNumber, TxNumber, B256};
139    use assert_matches::assert_matches;
140    use itertools::{
141        FoldWhile::{Continue, Done},
142        Itertools,
143    };
144    use reth_db_api::tables;
145    use reth_primitives_traits::SignerRecoverable;
146    use reth_provider::{DBProvider, DatabaseProviderFactory, PruneCheckpointReader};
147    use reth_prune_types::{PruneCheckpoint, PruneMode, PruneProgress, PruneSegment};
148    use reth_stages::test_utils::{StorageKind, TestStageDB};
149    use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
150    use std::ops::Sub;
151
152    #[test]
153    fn prune() {
154        let db = TestStageDB::default();
155        let mut rng = generators::rng();
156
157        let blocks = random_block_range(
158            &mut rng,
159            1..=10,
160            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
161        );
162        db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
163
164        let mut transaction_senders = Vec::new();
165        for block in &blocks {
166            transaction_senders.reserve_exact(block.transaction_count());
167            for transaction in &block.body().transactions {
168                transaction_senders.push((
169                    transaction_senders.len() as u64,
170                    transaction.recover_signer().expect("recover signer"),
171                ));
172            }
173        }
174        let transaction_senders_len = transaction_senders.len();
175        db.insert_transaction_senders(transaction_senders).expect("insert transaction senders");
176
177        assert_eq!(
178            db.table::<tables::Transactions>().unwrap().len(),
179            blocks.iter().map(|block| block.transaction_count()).sum::<usize>()
180        );
181        assert_eq!(
182            db.table::<tables::Transactions>().unwrap().len(),
183            db.table::<tables::TransactionSenders>().unwrap().len()
184        );
185
186        let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| {
187            let prune_mode = PruneMode::Before(to_block);
188            let segment = SenderRecovery::new(prune_mode);
189            let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10);
190            let input = PruneInput {
191                previous_checkpoint: db
192                    .factory
193                    .provider()
194                    .unwrap()
195                    .get_prune_checkpoint(PruneSegment::SenderRecovery)
196                    .unwrap(),
197                to_block,
198                limiter: limiter.clone(),
199            };
200
201            let next_tx_number_to_prune = db
202                .factory
203                .provider()
204                .unwrap()
205                .get_prune_checkpoint(PruneSegment::SenderRecovery)
206                .unwrap()
207                .and_then(|checkpoint| checkpoint.tx_number)
208                .map(|tx_number| tx_number + 1)
209                .unwrap_or_default();
210
211            let last_pruned_tx_number = blocks
212                .iter()
213                .take(to_block as usize)
214                .map(|block| block.transaction_count())
215                .sum::<usize>()
216                .min(
217                    next_tx_number_to_prune as usize +
218                        input.limiter.deleted_entries_limit().unwrap(),
219                )
220                .sub(1);
221
222            let last_pruned_block_number = blocks
223                .iter()
224                .fold_while((0, 0), |(_, mut tx_count), block| {
225                    tx_count += block.transaction_count();
226
227                    if tx_count > last_pruned_tx_number {
228                        Done((block.number, tx_count))
229                    } else {
230                        Continue((block.number, tx_count))
231                    }
232                })
233                .into_inner()
234                .0;
235
236            let provider = db.factory.database_provider_rw().unwrap();
237            let result = segment.prune(&provider, input).unwrap();
238            limiter.increment_deleted_entries_count_by(result.pruned);
239
240            assert_matches!(
241                result,
242                SegmentOutput {progress, pruned, checkpoint: Some(_)}
243                    if (progress, pruned) == expected_result
244            );
245
246            segment
247                .save_checkpoint(
248                    &provider,
249                    result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
250                )
251                .unwrap();
252            provider.commit().expect("commit");
253
254            let last_pruned_block_number = last_pruned_block_number
255                .checked_sub(if result.progress.is_finished() { 0 } else { 1 });
256
257            assert_eq!(
258                db.table::<tables::TransactionSenders>().unwrap().len(),
259                transaction_senders_len - (last_pruned_tx_number + 1)
260            );
261            assert_eq!(
262                db.factory
263                    .provider()
264                    .unwrap()
265                    .get_prune_checkpoint(PruneSegment::SenderRecovery)
266                    .unwrap(),
267                Some(PruneCheckpoint {
268                    block_number: last_pruned_block_number,
269                    tx_number: Some(last_pruned_tx_number as TxNumber),
270                    prune_mode
271                })
272            );
273        };
274
275        test_prune(
276            6,
277            (
278                PruneProgress::HasMoreData(
279                    reth_prune_types::PruneInterruptReason::DeletedEntriesLimitReached,
280                ),
281                10,
282            ),
283        );
284        test_prune(6, (PruneProgress::Finished, 2));
285        test_prune(10, (PruneProgress::Finished, 8));
286    }
287}