1use crate::{db_ext::DbTxPruneExt, segments::PruneInput, PrunerError};
7use reth_db_api::{table::Value, tables, transaction::DbTxMut};
8use reth_primitives_traits::NodePrimitives;
9use reth_provider::{
10 errors::provider::ProviderResult, BlockReader, DBProvider, NodePrimitivesProvider,
11 PruneCheckpointWriter, TransactionsProvider,
12};
13use reth_prune_types::{PruneCheckpoint, PruneSegment, SegmentOutput, SegmentOutputCheckpoint};
14use tracing::trace;
15
16pub(crate) fn prune<Provider>(
17 provider: &Provider,
18 input: PruneInput,
19) -> Result<SegmentOutput, PrunerError>
20where
21 Provider: DBProvider<Tx: DbTxMut>
22 + TransactionsProvider
23 + BlockReader
24 + NodePrimitivesProvider<Primitives: NodePrimitives<Receipt: Value>>,
25{
26 let tx_range = match input.get_next_tx_num_range(provider)? {
27 Some(range) => range,
28 None => {
29 trace!(target: "pruner", "No receipts to prune");
30 return Ok(SegmentOutput::done())
31 }
32 };
33 let tx_range_end = *tx_range.end();
34
35 let mut limiter = input.limiter;
36
37 let mut last_pruned_transaction = tx_range_end;
38 let (pruned, done) = provider.tx_ref().prune_table_with_range::<tables::Receipts<
39 <Provider::Primitives as NodePrimitives>::Receipt,
40 >>(
41 tx_range,
42 &mut limiter,
43 |_| false,
44 |row| last_pruned_transaction = row.0,
45 )?;
46 trace!(target: "pruner", %pruned, %done, "Pruned receipts");
47
48 let last_pruned_block = provider
49 .transaction_block(last_pruned_transaction)?
50 .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
51 .checked_sub(if done { 0 } else { 1 });
54
55 let progress = limiter.progress(done);
56
57 Ok(SegmentOutput {
58 progress,
59 pruned,
60 checkpoint: Some(SegmentOutputCheckpoint {
61 block_number: last_pruned_block,
62 tx_number: Some(last_pruned_transaction),
63 }),
64 })
65}
66
67pub(crate) fn save_checkpoint(
68 provider: impl PruneCheckpointWriter,
69 checkpoint: PruneCheckpoint,
70) -> ProviderResult<()> {
71 provider.save_prune_checkpoint(PruneSegment::Receipts, checkpoint)?;
72
73 provider.save_prune_checkpoint(PruneSegment::ContractLogs, checkpoint)?;
76
77 Ok(())
78}
79
80#[cfg(test)]
81mod tests {
82 use crate::segments::{PruneInput, PruneLimiter, SegmentOutput};
83 use alloy_primitives::{BlockNumber, TxNumber, B256};
84 use assert_matches::assert_matches;
85 use itertools::{
86 FoldWhile::{Continue, Done},
87 Itertools,
88 };
89 use reth_db_api::tables;
90 use reth_provider::{DBProvider, DatabaseProviderFactory, PruneCheckpointReader};
91 use reth_prune_types::{
92 PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
93 };
94 use reth_stages::test_utils::{StorageKind, TestStageDB};
95 use reth_testing_utils::generators::{
96 self, random_block_range, random_receipt, BlockRangeParams,
97 };
98 use std::ops::Sub;
99
100 #[test]
101 fn prune() {
102 let db = TestStageDB::default();
103 let mut rng = generators::rng();
104
105 let blocks = random_block_range(
106 &mut rng,
107 1..=10,
108 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
109 );
110 db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
111
112 let mut receipts = Vec::new();
113 for block in &blocks {
114 receipts.reserve_exact(block.transaction_count());
115 for transaction in &block.body().transactions {
116 receipts.push((
117 receipts.len() as u64,
118 random_receipt(&mut rng, transaction, Some(0), None),
119 ));
120 }
121 }
122 let receipts_len = receipts.len();
123 db.insert_receipts(receipts).expect("insert receipts");
124
125 assert_eq!(
126 db.table::<tables::Transactions>().unwrap().len(),
127 blocks.iter().map(|block| block.transaction_count()).sum::<usize>()
128 );
129 assert_eq!(
130 db.table::<tables::Transactions>().unwrap().len(),
131 db.table::<tables::Receipts>().unwrap().len()
132 );
133
134 let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| {
135 let prune_mode = PruneMode::Before(to_block);
136 let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10);
137 let input = PruneInput {
138 previous_checkpoint: db
139 .factory
140 .provider()
141 .unwrap()
142 .get_prune_checkpoint(PruneSegment::Receipts)
143 .unwrap(),
144 to_block,
145 limiter: limiter.clone(),
146 };
147
148 let next_tx_number_to_prune = db
149 .factory
150 .provider()
151 .unwrap()
152 .get_prune_checkpoint(PruneSegment::Receipts)
153 .unwrap()
154 .and_then(|checkpoint| checkpoint.tx_number)
155 .map(|tx_number| tx_number + 1)
156 .unwrap_or_default();
157
158 let last_pruned_tx_number = blocks
159 .iter()
160 .take(to_block as usize)
161 .map(|block| block.transaction_count())
162 .sum::<usize>()
163 .min(
164 next_tx_number_to_prune as usize +
165 input.limiter.deleted_entries_limit().unwrap(),
166 )
167 .sub(1);
168
169 let provider = db.factory.database_provider_rw().unwrap();
170 let result = super::prune(&provider, input).unwrap();
171 limiter.increment_deleted_entries_count_by(result.pruned);
172
173 assert_matches!(
174 result,
175 SegmentOutput {progress, pruned, checkpoint: Some(_)}
176 if (progress, pruned) == expected_result
177 );
178
179 super::save_checkpoint(
180 &provider,
181 result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
182 )
183 .unwrap();
184 provider.commit().expect("commit");
185
186 let last_pruned_block_number = blocks
187 .iter()
188 .fold_while((0, 0), |(_, mut tx_count), block| {
189 tx_count += block.transaction_count();
190
191 if tx_count > last_pruned_tx_number {
192 Done((block.number, tx_count))
193 } else {
194 Continue((block.number, tx_count))
195 }
196 })
197 .into_inner()
198 .0
199 .checked_sub(if result.progress.is_finished() { 0 } else { 1 });
200
201 assert_eq!(
202 db.table::<tables::Receipts>().unwrap().len(),
203 receipts_len - (last_pruned_tx_number + 1)
204 );
205 assert_eq!(
206 db.factory
207 .provider()
208 .unwrap()
209 .get_prune_checkpoint(PruneSegment::Receipts)
210 .unwrap(),
211 Some(PruneCheckpoint {
212 block_number: last_pruned_block_number,
213 tx_number: Some(last_pruned_tx_number as TxNumber),
214 prune_mode
215 })
216 );
217 };
218
219 test_prune(
220 6,
221 (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 10),
222 );
223 test_prune(6, (PruneProgress::Finished, 2));
224 test_prune(10, (PruneProgress::Finished, 8));
225 }
226}