1use crate::{db_ext::DbTxPruneExt, segments::PruneInput, PrunerError};
9use reth_db_api::{table::Value, tables, transaction::DbTxMut};
10use reth_primitives_traits::NodePrimitives;
11use reth_provider::{
12 errors::provider::ProviderResult, BlockReader, DBProvider, NodePrimitivesProvider,
13 PruneCheckpointWriter, TransactionsProvider,
14};
15use reth_prune_types::{PruneCheckpoint, PruneSegment, SegmentOutput, SegmentOutputCheckpoint};
16use tracing::trace;
17
18pub(crate) fn prune<Provider>(
19 provider: &Provider,
20 input: PruneInput,
21) -> Result<SegmentOutput, PrunerError>
22where
23 Provider: DBProvider<Tx: DbTxMut>
24 + TransactionsProvider
25 + BlockReader
26 + NodePrimitivesProvider<Primitives: NodePrimitives<Receipt: Value>>,
27{
28 let tx_range = match input.get_next_tx_num_range(provider)? {
29 Some(range) => range,
30 None => {
31 trace!(target: "pruner", "No receipts to prune");
32 return Ok(SegmentOutput::done())
33 }
34 };
35 let tx_range_end = *tx_range.end();
36
37 let mut limiter = input.limiter;
38
39 let mut last_pruned_transaction = tx_range_end;
40 let (pruned, done) = provider.tx_ref().prune_table_with_range::<tables::Receipts<
41 <Provider::Primitives as NodePrimitives>::Receipt,
42 >>(
43 tx_range,
44 &mut limiter,
45 |_| false,
46 |row| last_pruned_transaction = row.0,
47 )?;
48 trace!(target: "pruner", %pruned, %done, "Pruned receipts");
49
50 let last_pruned_block = provider
51 .transaction_block(last_pruned_transaction)?
52 .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
53 .checked_sub(if done { 0 } else { 1 });
56
57 let progress = limiter.progress(done);
58
59 Ok(SegmentOutput {
60 progress,
61 pruned,
62 checkpoint: Some(SegmentOutputCheckpoint {
63 block_number: last_pruned_block,
64 tx_number: Some(last_pruned_transaction),
65 }),
66 })
67}
68
69pub(crate) fn save_checkpoint(
70 provider: impl PruneCheckpointWriter,
71 checkpoint: PruneCheckpoint,
72) -> ProviderResult<()> {
73 provider.save_prune_checkpoint(PruneSegment::Receipts, checkpoint)?;
74
75 provider.save_prune_checkpoint(PruneSegment::ContractLogs, checkpoint)?;
78
79 Ok(())
80}
81
82#[cfg(test)]
83mod tests {
84 use crate::segments::{PruneInput, PruneLimiter, SegmentOutput};
85 use alloy_primitives::{BlockNumber, TxNumber, B256};
86 use assert_matches::assert_matches;
87 use itertools::{
88 FoldWhile::{Continue, Done},
89 Itertools,
90 };
91 use reth_db_api::tables;
92 use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader};
93 use reth_prune_types::{
94 PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
95 };
96 use reth_stages::test_utils::{StorageKind, TestStageDB};
97 use reth_testing_utils::generators::{
98 self, random_block_range, random_receipt, BlockRangeParams,
99 };
100 use std::ops::Sub;
101
102 #[test]
103 fn prune() {
104 let db = TestStageDB::default();
105 let mut rng = generators::rng();
106
107 let blocks = random_block_range(
108 &mut rng,
109 1..=10,
110 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
111 );
112 db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
113
114 let mut receipts = Vec::new();
115 for block in &blocks {
116 receipts.reserve_exact(block.transaction_count());
117 for transaction in &block.body().transactions {
118 receipts.push((
119 receipts.len() as u64,
120 random_receipt(&mut rng, transaction, Some(0), None),
121 ));
122 }
123 }
124 let receipts_len = receipts.len();
125 db.insert_receipts(receipts).expect("insert receipts");
126
127 assert_eq!(
128 db.table::<tables::Transactions>().unwrap().len(),
129 blocks.iter().map(|block| block.transaction_count()).sum::<usize>()
130 );
131 assert_eq!(
132 db.table::<tables::Transactions>().unwrap().len(),
133 db.table::<tables::Receipts>().unwrap().len()
134 );
135
136 let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| {
137 let prune_mode = PruneMode::Before(to_block);
138 let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10);
139 let input = PruneInput {
140 previous_checkpoint: db
141 .factory
142 .provider()
143 .unwrap()
144 .get_prune_checkpoint(PruneSegment::Receipts)
145 .unwrap(),
146 to_block,
147 limiter: limiter.clone(),
148 };
149
150 let next_tx_number_to_prune = db
151 .factory
152 .provider()
153 .unwrap()
154 .get_prune_checkpoint(PruneSegment::Receipts)
155 .unwrap()
156 .and_then(|checkpoint| checkpoint.tx_number)
157 .map(|tx_number| tx_number + 1)
158 .unwrap_or_default();
159
160 let last_pruned_tx_number = blocks
161 .iter()
162 .take(to_block as usize)
163 .map(|block| block.transaction_count())
164 .sum::<usize>()
165 .min(
166 next_tx_number_to_prune as usize +
167 input.limiter.deleted_entries_limit().unwrap(),
168 )
169 .sub(1);
170
171 let provider = db.factory.database_provider_rw().unwrap();
172 let result = super::prune(&provider, input).unwrap();
173 limiter.increment_deleted_entries_count_by(result.pruned);
174
175 assert_matches!(
176 result,
177 SegmentOutput {progress, pruned, checkpoint: Some(_)}
178 if (progress, pruned) == expected_result
179 );
180
181 super::save_checkpoint(
182 &provider,
183 result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
184 )
185 .unwrap();
186 provider.commit().expect("commit");
187
188 let last_pruned_block_number = blocks
189 .iter()
190 .fold_while((0, 0), |(_, mut tx_count), block| {
191 tx_count += block.transaction_count();
192
193 if tx_count > last_pruned_tx_number {
194 Done((block.number, tx_count))
195 } else {
196 Continue((block.number, tx_count))
197 }
198 })
199 .into_inner()
200 .0
201 .checked_sub(if result.progress.is_finished() { 0 } else { 1 });
202
203 assert_eq!(
204 db.table::<tables::Receipts>().unwrap().len(),
205 receipts_len - (last_pruned_tx_number + 1)
206 );
207 assert_eq!(
208 db.factory
209 .provider()
210 .unwrap()
211 .get_prune_checkpoint(PruneSegment::Receipts)
212 .unwrap(),
213 Some(PruneCheckpoint {
214 block_number: last_pruned_block_number,
215 tx_number: Some(last_pruned_tx_number as TxNumber),
216 prune_mode
217 })
218 );
219 };
220
221 test_prune(
222 6,
223 (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 10),
224 );
225 test_prune(6, (PruneProgress::Finished, 2));
226 test_prune(10, (PruneProgress::Finished, 8));
227 }
228}