reth_prune/segments/user/
sender_recovery.rs
1use crate::{
2 db_ext::DbTxPruneExt,
3 segments::{PruneInput, Segment},
4 PrunerError,
5};
6use reth_db_api::{tables, transaction::DbTxMut};
7use reth_provider::{BlockReader, DBProvider, TransactionsProvider};
8use reth_prune_types::{
9 PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
10};
11use tracing::{instrument, trace};
12
13#[derive(Debug)]
14pub struct SenderRecovery {
15 mode: PruneMode,
16}
17
18impl SenderRecovery {
19 pub const fn new(mode: PruneMode) -> Self {
20 Self { mode }
21 }
22}
23
24impl<Provider> Segment<Provider> for SenderRecovery
25where
26 Provider: DBProvider<Tx: DbTxMut> + TransactionsProvider + BlockReader,
27{
28 fn segment(&self) -> PruneSegment {
29 PruneSegment::SenderRecovery
30 }
31
32 fn mode(&self) -> Option<PruneMode> {
33 Some(self.mode)
34 }
35
36 fn purpose(&self) -> PrunePurpose {
37 PrunePurpose::User
38 }
39
40 #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
41 fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
42 let tx_range = match input.get_next_tx_num_range(provider)? {
43 Some(range) => range,
44 None => {
45 trace!(target: "pruner", "No transaction senders to prune");
46 return Ok(SegmentOutput::done())
47 }
48 };
49 let tx_range_end = *tx_range.end();
50
51 let mut limiter = input.limiter;
52
53 let mut last_pruned_transaction = tx_range_end;
54 let (pruned, done) =
55 provider.tx_ref().prune_table_with_range::<tables::TransactionSenders>(
56 tx_range,
57 &mut limiter,
58 |_| false,
59 |row| last_pruned_transaction = row.0,
60 )?;
61 trace!(target: "pruner", %pruned, %done, "Pruned transaction senders");
62
63 let last_pruned_block = provider
64 .transaction_block(last_pruned_transaction)?
65 .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
66 .checked_sub(if done { 0 } else { 1 });
69
70 let progress = limiter.progress(done);
71
72 Ok(SegmentOutput {
73 progress,
74 pruned,
75 checkpoint: Some(SegmentOutputCheckpoint {
76 block_number: last_pruned_block,
77 tx_number: Some(last_pruned_transaction),
78 }),
79 })
80 }
81}
82
83#[cfg(test)]
84mod tests {
85 use crate::segments::{PruneInput, PruneLimiter, Segment, SegmentOutput, SenderRecovery};
86 use alloy_primitives::{BlockNumber, TxNumber, B256};
87 use assert_matches::assert_matches;
88 use itertools::{
89 FoldWhile::{Continue, Done},
90 Itertools,
91 };
92 use reth_db_api::tables;
93 use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader};
94 use reth_prune_types::{PruneCheckpoint, PruneMode, PruneProgress, PruneSegment};
95 use reth_stages::test_utils::{StorageKind, TestStageDB};
96 use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
97 use std::ops::Sub;
98
99 #[test]
100 fn prune() {
101 let db = TestStageDB::default();
102 let mut rng = generators::rng();
103
104 let blocks = random_block_range(
105 &mut rng,
106 1..=10,
107 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
108 );
109 db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
110
111 let mut transaction_senders = Vec::new();
112 for block in &blocks {
113 transaction_senders.reserve_exact(block.transaction_count());
114 for transaction in &block.body().transactions {
115 transaction_senders.push((
116 transaction_senders.len() as u64,
117 transaction.recover_signer().expect("recover signer"),
118 ));
119 }
120 }
121 let transaction_senders_len = transaction_senders.len();
122 db.insert_transaction_senders(transaction_senders).expect("insert transaction senders");
123
124 assert_eq!(
125 db.table::<tables::Transactions>().unwrap().len(),
126 blocks.iter().map(|block| block.transaction_count()).sum::<usize>()
127 );
128 assert_eq!(
129 db.table::<tables::Transactions>().unwrap().len(),
130 db.table::<tables::TransactionSenders>().unwrap().len()
131 );
132
133 let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| {
134 let prune_mode = PruneMode::Before(to_block);
135 let segment = SenderRecovery::new(prune_mode);
136 let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10);
137 let input = PruneInput {
138 previous_checkpoint: db
139 .factory
140 .provider()
141 .unwrap()
142 .get_prune_checkpoint(PruneSegment::SenderRecovery)
143 .unwrap(),
144 to_block,
145 limiter: limiter.clone(),
146 };
147
148 let next_tx_number_to_prune = db
149 .factory
150 .provider()
151 .unwrap()
152 .get_prune_checkpoint(PruneSegment::SenderRecovery)
153 .unwrap()
154 .and_then(|checkpoint| checkpoint.tx_number)
155 .map(|tx_number| tx_number + 1)
156 .unwrap_or_default();
157
158 let last_pruned_tx_number = blocks
159 .iter()
160 .take(to_block as usize)
161 .map(|block| block.transaction_count())
162 .sum::<usize>()
163 .min(
164 next_tx_number_to_prune as usize +
165 input.limiter.deleted_entries_limit().unwrap(),
166 )
167 .sub(1);
168
169 let last_pruned_block_number = blocks
170 .iter()
171 .fold_while((0, 0), |(_, mut tx_count), block| {
172 tx_count += block.transaction_count();
173
174 if tx_count > last_pruned_tx_number {
175 Done((block.number, tx_count))
176 } else {
177 Continue((block.number, tx_count))
178 }
179 })
180 .into_inner()
181 .0;
182
183 let provider = db.factory.database_provider_rw().unwrap();
184 let result = segment.prune(&provider, input).unwrap();
185 limiter.increment_deleted_entries_count_by(result.pruned);
186
187 assert_matches!(
188 result,
189 SegmentOutput {progress, pruned, checkpoint: Some(_)}
190 if (progress, pruned) == expected_result
191 );
192
193 segment
194 .save_checkpoint(
195 &provider,
196 result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
197 )
198 .unwrap();
199 provider.commit().expect("commit");
200
201 let last_pruned_block_number = last_pruned_block_number
202 .checked_sub(if result.progress.is_finished() { 0 } else { 1 });
203
204 assert_eq!(
205 db.table::<tables::TransactionSenders>().unwrap().len(),
206 transaction_senders_len - (last_pruned_tx_number + 1)
207 );
208 assert_eq!(
209 db.factory
210 .provider()
211 .unwrap()
212 .get_prune_checkpoint(PruneSegment::SenderRecovery)
213 .unwrap(),
214 Some(PruneCheckpoint {
215 block_number: last_pruned_block_number,
216 tx_number: Some(last_pruned_tx_number as TxNumber),
217 prune_mode
218 })
219 );
220 };
221
222 test_prune(
223 6,
224 (
225 PruneProgress::HasMoreData(
226 reth_prune_types::PruneInterruptReason::DeletedEntriesLimitReached,
227 ),
228 10,
229 ),
230 );
231 test_prune(6, (PruneProgress::Finished, 2));
232 test_prune(10, (PruneProgress::Finished, 8));
233 }
234}