1use crate::{
7 db_ext::DbTxPruneExt,
8 segments::{self, PruneInput},
9 PrunerError,
10};
11use reth_db_api::{table::Value, tables, transaction::DbTxMut};
12use reth_primitives_traits::NodePrimitives;
13use reth_provider::{
14 errors::provider::ProviderResult, BlockReader, DBProvider, EitherWriter,
15 NodePrimitivesProvider, PruneCheckpointWriter, StaticFileProviderFactory, StorageSettingsCache,
16 TransactionsProvider,
17};
18use reth_prune_types::{PruneCheckpoint, PruneSegment, SegmentOutput, SegmentOutputCheckpoint};
19use reth_static_file_types::StaticFileSegment;
20use tracing::{debug, trace};
21
22pub(crate) fn prune<Provider>(
23 provider: &Provider,
24 input: PruneInput,
25) -> Result<SegmentOutput, PrunerError>
26where
27 Provider: DBProvider<Tx: DbTxMut>
28 + TransactionsProvider
29 + BlockReader
30 + StorageSettingsCache
31 + StaticFileProviderFactory
32 + NodePrimitivesProvider<Primitives: NodePrimitives<Receipt: Value>>,
33{
34 if EitherWriter::receipts_destination(provider).is_static_file() {
35 debug!(target: "pruner", "Pruning receipts from static files.");
36 return segments::prune_static_files(provider, input, StaticFileSegment::Receipts)
37 }
38 debug!(target: "pruner", "Pruning receipts from database.");
39
40 let tx_range = match input.get_next_tx_num_range(provider)? {
42 Some(range) => range,
43 None => {
44 trace!(target: "pruner", "No receipts to prune");
45 return Ok(SegmentOutput::done())
46 }
47 };
48 let tx_range_end = *tx_range.end();
49
50 let mut limiter = input.limiter;
51
52 let mut last_pruned_transaction = tx_range_end;
53 let (pruned, done) = provider.tx_ref().prune_table_with_range::<tables::Receipts<
54 <Provider::Primitives as NodePrimitives>::Receipt,
55 >>(
56 tx_range,
57 &mut limiter,
58 |_| false,
59 |row| last_pruned_transaction = row.0,
60 )?;
61 trace!(target: "pruner", %pruned, %done, "Pruned receipts");
62
63 let last_pruned_block = provider
64 .block_by_transaction_id(last_pruned_transaction)?
65 .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
66 .checked_sub(if done { 0 } else { 1 });
69
70 let progress = limiter.progress(done);
71
72 Ok(SegmentOutput {
73 progress,
74 pruned,
75 checkpoint: Some(SegmentOutputCheckpoint {
76 block_number: last_pruned_block,
77 tx_number: Some(last_pruned_transaction),
78 }),
79 })
80}
81
82pub(crate) fn save_checkpoint(
83 provider: impl PruneCheckpointWriter,
84 checkpoint: PruneCheckpoint,
85) -> ProviderResult<()> {
86 provider.save_prune_checkpoint(PruneSegment::Receipts, checkpoint)?;
87
88 provider.save_prune_checkpoint(PruneSegment::ContractLogs, checkpoint)?;
91
92 Ok(())
93}
94
95#[cfg(test)]
96mod tests {
97 use crate::segments::{PruneInput, PruneLimiter, SegmentOutput};
98 use alloy_primitives::{BlockNumber, TxNumber, B256};
99 use assert_matches::assert_matches;
100 use itertools::{
101 FoldWhile::{Continue, Done},
102 Itertools,
103 };
104 use reth_db_api::tables;
105 use reth_provider::{DBProvider, DatabaseProviderFactory, PruneCheckpointReader};
106 use reth_prune_types::{
107 PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
108 };
109 use reth_stages::test_utils::{StorageKind, TestStageDB};
110 use reth_testing_utils::generators::{
111 self, random_block_range, random_receipt, BlockRangeParams,
112 };
113 use std::ops::Sub;
114
115 #[test]
116 fn prune_legacy() {
117 let mut db = TestStageDB::default();
118 db.factory = db.factory.with_prune_modes(reth_prune_types::PruneModes {
121 receipts: Some(PruneMode::Full),
122 ..Default::default()
123 });
124 let mut rng = generators::rng();
125
126 let blocks = random_block_range(
127 &mut rng,
128 1..=10,
129 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
130 );
131 db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
132
133 let mut receipts = Vec::new();
134 for block in &blocks {
135 receipts.reserve_exact(block.transaction_count());
136 for transaction in &block.body().transactions {
137 receipts.push((
138 receipts.len() as u64,
139 random_receipt(&mut rng, transaction, Some(0), None),
140 ));
141 }
142 }
143 let receipts_len = receipts.len();
144 db.insert_receipts(receipts).expect("insert receipts");
145
146 assert_eq!(
147 db.table::<tables::Transactions>().unwrap().len(),
148 blocks.iter().map(|block| block.transaction_count()).sum::<usize>()
149 );
150 assert_eq!(
151 db.table::<tables::Transactions>().unwrap().len(),
152 db.table::<tables::Receipts>().unwrap().len()
153 );
154
155 let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| {
156 let prune_mode = PruneMode::Before(to_block);
157 let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10);
158 let input = PruneInput {
159 previous_checkpoint: db
160 .factory
161 .provider()
162 .unwrap()
163 .get_prune_checkpoint(PruneSegment::Receipts)
164 .unwrap(),
165 to_block,
166 limiter: limiter.clone(),
167 };
168
169 let next_tx_number_to_prune = db
170 .factory
171 .provider()
172 .unwrap()
173 .get_prune_checkpoint(PruneSegment::Receipts)
174 .unwrap()
175 .and_then(|checkpoint| checkpoint.tx_number)
176 .map(|tx_number| tx_number + 1)
177 .unwrap_or_default();
178
179 let last_pruned_tx_number = blocks
180 .iter()
181 .take(to_block as usize)
182 .map(|block| block.transaction_count())
183 .sum::<usize>()
184 .min(
185 next_tx_number_to_prune as usize +
186 input.limiter.deleted_entries_limit().unwrap(),
187 )
188 .sub(1);
189
190 let provider = db.factory.database_provider_rw().unwrap();
191 let result = super::prune(&provider, input).unwrap();
192 limiter.increment_deleted_entries_count_by(result.pruned);
193
194 assert_matches!(
195 result,
196 SegmentOutput {progress, pruned, checkpoint: Some(_)}
197 if (progress, pruned) == expected_result
198 );
199
200 super::save_checkpoint(
201 &provider,
202 result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
203 )
204 .unwrap();
205 provider.commit().expect("commit");
206
207 let last_pruned_block_number = blocks
208 .iter()
209 .fold_while((0, 0), |(_, mut tx_count), block| {
210 tx_count += block.transaction_count();
211
212 if tx_count > last_pruned_tx_number {
213 Done((block.number, tx_count))
214 } else {
215 Continue((block.number, tx_count))
216 }
217 })
218 .into_inner()
219 .0
220 .checked_sub(if result.progress.is_finished() { 0 } else { 1 });
221
222 assert_eq!(
223 db.table::<tables::Receipts>().unwrap().len(),
224 receipts_len - (last_pruned_tx_number + 1)
225 );
226 assert_eq!(
227 db.factory
228 .provider()
229 .unwrap()
230 .get_prune_checkpoint(PruneSegment::Receipts)
231 .unwrap(),
232 Some(PruneCheckpoint {
233 block_number: last_pruned_block_number,
234 tx_number: Some(last_pruned_tx_number as TxNumber),
235 prune_mode
236 })
237 );
238 };
239
240 test_prune(
241 6,
242 (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 10),
243 );
244 test_prune(6, (PruneProgress::Finished, 2));
245 test_prune(10, (PruneProgress::Finished, 8));
246 }
247}