reth_prune/segments/user/
sender_recovery.rs1use crate::{
2 db_ext::DbTxPruneExt,
3 segments::{self, PruneInput, Segment},
4 PrunerError,
5};
6use reth_db_api::{tables, transaction::DbTxMut};
7use reth_provider::{
8 BlockReader, DBProvider, EitherWriterDestination, StaticFileProviderFactory,
9 StorageSettingsCache, TransactionsProvider,
10};
11use reth_prune_types::{
12 PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
13};
14use reth_static_file_types::StaticFileSegment;
15use tracing::{debug, instrument, trace};
16
17#[derive(Debug)]
18pub struct SenderRecovery {
19 mode: PruneMode,
20}
21
22impl SenderRecovery {
23 pub const fn new(mode: PruneMode) -> Self {
24 Self { mode }
25 }
26}
27
28impl<Provider> Segment<Provider> for SenderRecovery
29where
30 Provider: DBProvider<Tx: DbTxMut>
31 + TransactionsProvider
32 + BlockReader
33 + StorageSettingsCache
34 + StaticFileProviderFactory,
35{
36 fn segment(&self) -> PruneSegment {
37 PruneSegment::SenderRecovery
38 }
39
40 fn mode(&self) -> Option<PruneMode> {
41 Some(self.mode)
42 }
43
44 fn purpose(&self) -> PrunePurpose {
45 PrunePurpose::User
46 }
47
48 #[instrument(
49 name = "SenderRecovery::prune",
50 target = "pruner",
51 skip(self, provider),
52 ret(level = "trace")
53 )]
54 fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
55 if EitherWriterDestination::senders(provider).is_static_file() {
56 debug!(target: "pruner", "Pruning transaction senders from static files.");
57
58 if self.mode.is_full() {
59 debug!(target: "pruner", "PruneMode::Full: deleting all transaction senders static files.");
60 return segments::delete_static_files_segment(
61 provider,
62 input,
63 StaticFileSegment::TransactionSenders,
64 )
65 }
66
67 return segments::prune_static_files(
68 provider,
69 input,
70 StaticFileSegment::TransactionSenders,
71 )
72 }
73 debug!(target: "pruner", "Pruning transaction senders from database.");
74
75 let tx_range = match input.get_next_tx_num_range(provider)? {
76 Some(range) => range,
77 None => {
78 trace!(target: "pruner", "No transaction senders to prune");
79 return Ok(SegmentOutput::done())
80 }
81 };
82 let tx_range_end = *tx_range.end();
83
84 if self.mode.is_full() {
86 let pruned = provider.tx_ref().clear_table::<tables::TransactionSenders>()?;
87 trace!(target: "pruner", %pruned, "Cleared transaction senders table");
88
89 let last_pruned_block = provider
90 .block_by_transaction_id(tx_range_end)?
91 .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?;
92
93 return Ok(SegmentOutput {
94 progress: PruneProgress::Finished,
95 pruned,
96 checkpoint: Some(SegmentOutputCheckpoint {
97 block_number: Some(last_pruned_block),
98 tx_number: Some(tx_range_end),
99 }),
100 });
101 }
102
103 let mut limiter = input.limiter;
104
105 let mut last_pruned_transaction = tx_range_end;
106 let (pruned, done) =
107 provider.tx_ref().prune_table_with_range::<tables::TransactionSenders>(
108 tx_range,
109 &mut limiter,
110 |_| false,
111 |row| last_pruned_transaction = row.0,
112 )?;
113 trace!(target: "pruner", %pruned, %done, "Pruned transaction senders");
114
115 let last_pruned_block = provider
116 .block_by_transaction_id(last_pruned_transaction)?
117 .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
118 .checked_sub(if done { 0 } else { 1 });
121
122 let progress = limiter.progress(done);
123
124 Ok(SegmentOutput {
125 progress,
126 pruned,
127 checkpoint: Some(SegmentOutputCheckpoint {
128 block_number: last_pruned_block,
129 tx_number: Some(last_pruned_transaction),
130 }),
131 })
132 }
133}
134
135#[cfg(test)]
136mod tests {
137 use crate::segments::{PruneInput, PruneLimiter, Segment, SegmentOutput, SenderRecovery};
138 use alloy_primitives::{BlockNumber, TxNumber, B256};
139 use assert_matches::assert_matches;
140 use itertools::{
141 FoldWhile::{Continue, Done},
142 Itertools,
143 };
144 use reth_db_api::tables;
145 use reth_primitives_traits::SignerRecoverable;
146 use reth_provider::{DBProvider, DatabaseProviderFactory, PruneCheckpointReader};
147 use reth_prune_types::{PruneCheckpoint, PruneMode, PruneProgress, PruneSegment};
148 use reth_stages::test_utils::{StorageKind, TestStageDB};
149 use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
150 use std::ops::Sub;
151
152 #[test]
153 fn prune() {
154 let db = TestStageDB::default();
155 let mut rng = generators::rng();
156
157 let blocks = random_block_range(
158 &mut rng,
159 1..=10,
160 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
161 );
162 db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
163
164 let mut transaction_senders = Vec::new();
165 for block in &blocks {
166 transaction_senders.reserve_exact(block.transaction_count());
167 for transaction in &block.body().transactions {
168 transaction_senders.push((
169 transaction_senders.len() as u64,
170 transaction.recover_signer().expect("recover signer"),
171 ));
172 }
173 }
174 let transaction_senders_len = transaction_senders.len();
175 db.insert_transaction_senders(transaction_senders).expect("insert transaction senders");
176
177 assert_eq!(
178 db.table::<tables::Transactions>().unwrap().len(),
179 blocks.iter().map(|block| block.transaction_count()).sum::<usize>()
180 );
181 assert_eq!(
182 db.table::<tables::Transactions>().unwrap().len(),
183 db.table::<tables::TransactionSenders>().unwrap().len()
184 );
185
186 let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| {
187 let prune_mode = PruneMode::Before(to_block);
188 let segment = SenderRecovery::new(prune_mode);
189 let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10);
190 let input = PruneInput {
191 previous_checkpoint: db
192 .factory
193 .provider()
194 .unwrap()
195 .get_prune_checkpoint(PruneSegment::SenderRecovery)
196 .unwrap(),
197 to_block,
198 limiter: limiter.clone(),
199 };
200
201 let next_tx_number_to_prune = db
202 .factory
203 .provider()
204 .unwrap()
205 .get_prune_checkpoint(PruneSegment::SenderRecovery)
206 .unwrap()
207 .and_then(|checkpoint| checkpoint.tx_number)
208 .map(|tx_number| tx_number + 1)
209 .unwrap_or_default();
210
211 let last_pruned_tx_number = blocks
212 .iter()
213 .take(to_block as usize)
214 .map(|block| block.transaction_count())
215 .sum::<usize>()
216 .min(
217 next_tx_number_to_prune as usize +
218 input.limiter.deleted_entries_limit().unwrap(),
219 )
220 .sub(1);
221
222 let last_pruned_block_number = blocks
223 .iter()
224 .fold_while((0, 0), |(_, mut tx_count), block| {
225 tx_count += block.transaction_count();
226
227 if tx_count > last_pruned_tx_number {
228 Done((block.number, tx_count))
229 } else {
230 Continue((block.number, tx_count))
231 }
232 })
233 .into_inner()
234 .0;
235
236 let provider = db.factory.database_provider_rw().unwrap();
237 let result = segment.prune(&provider, input).unwrap();
238 limiter.increment_deleted_entries_count_by(result.pruned);
239
240 assert_matches!(
241 result,
242 SegmentOutput {progress, pruned, checkpoint: Some(_)}
243 if (progress, pruned) == expected_result
244 );
245
246 segment
247 .save_checkpoint(
248 &provider,
249 result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
250 )
251 .unwrap();
252 provider.commit().expect("commit");
253
254 let last_pruned_block_number = last_pruned_block_number
255 .checked_sub(if result.progress.is_finished() { 0 } else { 1 });
256
257 assert_eq!(
258 db.table::<tables::TransactionSenders>().unwrap().len(),
259 transaction_senders_len - (last_pruned_tx_number + 1)
260 );
261 assert_eq!(
262 db.factory
263 .provider()
264 .unwrap()
265 .get_prune_checkpoint(PruneSegment::SenderRecovery)
266 .unwrap(),
267 Some(PruneCheckpoint {
268 block_number: last_pruned_block_number,
269 tx_number: Some(last_pruned_tx_number as TxNumber),
270 prune_mode
271 })
272 );
273 };
274
275 test_prune(
276 6,
277 (
278 PruneProgress::HasMoreData(
279 reth_prune_types::PruneInterruptReason::DeletedEntriesLimitReached,
280 ),
281 10,
282 ),
283 );
284 test_prune(6, (PruneProgress::Finished, 2));
285 test_prune(10, (PruneProgress::Finished, 8));
286 }
287}