reth_prune/segments/user/
sender_recovery.rs1use crate::{
2 db_ext::DbTxPruneExt,
3 segments::{PruneInput, Segment},
4 PrunerError,
5};
6use reth_db_api::{tables, transaction::DbTxMut};
7use reth_provider::{BlockReader, DBProvider, TransactionsProvider};
8use reth_prune_types::{
9 PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
10};
11use tracing::{instrument, trace};
12
13#[derive(Debug)]
14pub struct SenderRecovery {
15 mode: PruneMode,
16}
17
18impl SenderRecovery {
19 pub const fn new(mode: PruneMode) -> Self {
20 Self { mode }
21 }
22}
23
24impl<Provider> Segment<Provider> for SenderRecovery
25where
26 Provider: DBProvider<Tx: DbTxMut> + TransactionsProvider + BlockReader,
27{
28 fn segment(&self) -> PruneSegment {
29 PruneSegment::SenderRecovery
30 }
31
32 fn mode(&self) -> Option<PruneMode> {
33 Some(self.mode)
34 }
35
36 fn purpose(&self) -> PrunePurpose {
37 PrunePurpose::User
38 }
39
40 #[instrument(target = "pruner", skip(self, provider), ret(level = "trace"))]
41 fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
42 let tx_range = match input.get_next_tx_num_range(provider)? {
43 Some(range) => range,
44 None => {
45 trace!(target: "pruner", "No transaction senders to prune");
46 return Ok(SegmentOutput::done())
47 }
48 };
49 let tx_range_end = *tx_range.end();
50
51 if self.mode.is_full() {
53 let pruned = provider.tx_ref().clear_table::<tables::TransactionSenders>()?;
54 trace!(target: "pruner", %pruned, "Cleared transaction senders table");
55
56 let last_pruned_block = provider
57 .block_by_transaction_id(tx_range_end)?
58 .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?;
59
60 return Ok(SegmentOutput {
61 progress: PruneProgress::Finished,
62 pruned,
63 checkpoint: Some(SegmentOutputCheckpoint {
64 block_number: Some(last_pruned_block),
65 tx_number: Some(tx_range_end),
66 }),
67 });
68 }
69
70 let mut limiter = input.limiter;
71
72 let mut last_pruned_transaction = tx_range_end;
73 let (pruned, done) =
74 provider.tx_ref().prune_table_with_range::<tables::TransactionSenders>(
75 tx_range,
76 &mut limiter,
77 |_| false,
78 |row| last_pruned_transaction = row.0,
79 )?;
80 trace!(target: "pruner", %pruned, %done, "Pruned transaction senders");
81
82 let last_pruned_block = provider
83 .block_by_transaction_id(last_pruned_transaction)?
84 .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
85 .checked_sub(if done { 0 } else { 1 });
88
89 let progress = limiter.progress(done);
90
91 Ok(SegmentOutput {
92 progress,
93 pruned,
94 checkpoint: Some(SegmentOutputCheckpoint {
95 block_number: last_pruned_block,
96 tx_number: Some(last_pruned_transaction),
97 }),
98 })
99 }
100}
101
102#[cfg(test)]
103mod tests {
104 use crate::segments::{PruneInput, PruneLimiter, Segment, SegmentOutput, SenderRecovery};
105 use alloy_primitives::{BlockNumber, TxNumber, B256};
106 use assert_matches::assert_matches;
107 use itertools::{
108 FoldWhile::{Continue, Done},
109 Itertools,
110 };
111 use reth_db_api::tables;
112 use reth_primitives_traits::SignerRecoverable;
113 use reth_provider::{DBProvider, DatabaseProviderFactory, PruneCheckpointReader};
114 use reth_prune_types::{PruneCheckpoint, PruneMode, PruneProgress, PruneSegment};
115 use reth_stages::test_utils::{StorageKind, TestStageDB};
116 use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
117 use std::ops::Sub;
118
119 #[test]
120 fn prune() {
121 let db = TestStageDB::default();
122 let mut rng = generators::rng();
123
124 let blocks = random_block_range(
125 &mut rng,
126 1..=10,
127 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
128 );
129 db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
130
131 let mut transaction_senders = Vec::new();
132 for block in &blocks {
133 transaction_senders.reserve_exact(block.transaction_count());
134 for transaction in &block.body().transactions {
135 transaction_senders.push((
136 transaction_senders.len() as u64,
137 transaction.recover_signer().expect("recover signer"),
138 ));
139 }
140 }
141 let transaction_senders_len = transaction_senders.len();
142 db.insert_transaction_senders(transaction_senders).expect("insert transaction senders");
143
144 assert_eq!(
145 db.table::<tables::Transactions>().unwrap().len(),
146 blocks.iter().map(|block| block.transaction_count()).sum::<usize>()
147 );
148 assert_eq!(
149 db.table::<tables::Transactions>().unwrap().len(),
150 db.table::<tables::TransactionSenders>().unwrap().len()
151 );
152
153 let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| {
154 let prune_mode = PruneMode::Before(to_block);
155 let segment = SenderRecovery::new(prune_mode);
156 let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10);
157 let input = PruneInput {
158 previous_checkpoint: db
159 .factory
160 .provider()
161 .unwrap()
162 .get_prune_checkpoint(PruneSegment::SenderRecovery)
163 .unwrap(),
164 to_block,
165 limiter: limiter.clone(),
166 };
167
168 let next_tx_number_to_prune = db
169 .factory
170 .provider()
171 .unwrap()
172 .get_prune_checkpoint(PruneSegment::SenderRecovery)
173 .unwrap()
174 .and_then(|checkpoint| checkpoint.tx_number)
175 .map(|tx_number| tx_number + 1)
176 .unwrap_or_default();
177
178 let last_pruned_tx_number = blocks
179 .iter()
180 .take(to_block as usize)
181 .map(|block| block.transaction_count())
182 .sum::<usize>()
183 .min(
184 next_tx_number_to_prune as usize +
185 input.limiter.deleted_entries_limit().unwrap(),
186 )
187 .sub(1);
188
189 let last_pruned_block_number = blocks
190 .iter()
191 .fold_while((0, 0), |(_, mut tx_count), block| {
192 tx_count += block.transaction_count();
193
194 if tx_count > last_pruned_tx_number {
195 Done((block.number, tx_count))
196 } else {
197 Continue((block.number, tx_count))
198 }
199 })
200 .into_inner()
201 .0;
202
203 let provider = db.factory.database_provider_rw().unwrap();
204 let result = segment.prune(&provider, input).unwrap();
205 limiter.increment_deleted_entries_count_by(result.pruned);
206
207 assert_matches!(
208 result,
209 SegmentOutput {progress, pruned, checkpoint: Some(_)}
210 if (progress, pruned) == expected_result
211 );
212
213 segment
214 .save_checkpoint(
215 &provider,
216 result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
217 )
218 .unwrap();
219 provider.commit().expect("commit");
220
221 let last_pruned_block_number = last_pruned_block_number
222 .checked_sub(if result.progress.is_finished() { 0 } else { 1 });
223
224 assert_eq!(
225 db.table::<tables::TransactionSenders>().unwrap().len(),
226 transaction_senders_len - (last_pruned_tx_number + 1)
227 );
228 assert_eq!(
229 db.factory
230 .provider()
231 .unwrap()
232 .get_prune_checkpoint(PruneSegment::SenderRecovery)
233 .unwrap(),
234 Some(PruneCheckpoint {
235 block_number: last_pruned_block_number,
236 tx_number: Some(last_pruned_tx_number as TxNumber),
237 prune_mode
238 })
239 );
240 };
241
242 test_prune(
243 6,
244 (
245 PruneProgress::HasMoreData(
246 reth_prune_types::PruneInterruptReason::DeletedEntriesLimitReached,
247 ),
248 10,
249 ),
250 );
251 test_prune(6, (PruneProgress::Finished, 2));
252 test_prune(10, (PruneProgress::Finished, 8));
253 }
254}