reth_prune/segments/
receipts.rs

1//! Common receipts pruning logic shared between user and static file pruning segments.
2//!
3//! - [`crate::segments::user::Receipts`] is responsible for pruning receipts according to the
4//!   user-configured settings (for example, on a full node or with a custom prune config)
5//! - [`crate::segments::static_file::Receipts`] is responsible for pruning receipts on an archive
6//!   node after static file producer has finished
7
8use crate::{db_ext::DbTxPruneExt, segments::PruneInput, PrunerError};
9use reth_db_api::{table::Value, tables, transaction::DbTxMut};
10use reth_primitives_traits::NodePrimitives;
11use reth_provider::{
12    errors::provider::ProviderResult, BlockReader, DBProvider, NodePrimitivesProvider,
13    PruneCheckpointWriter, TransactionsProvider,
14};
15use reth_prune_types::{PruneCheckpoint, PruneSegment, SegmentOutput, SegmentOutputCheckpoint};
16use tracing::trace;
17
18pub(crate) fn prune<Provider>(
19    provider: &Provider,
20    input: PruneInput,
21) -> Result<SegmentOutput, PrunerError>
22where
23    Provider: DBProvider<Tx: DbTxMut>
24        + TransactionsProvider
25        + BlockReader
26        + NodePrimitivesProvider<Primitives: NodePrimitives<Receipt: Value>>,
27{
28    let tx_range = match input.get_next_tx_num_range(provider)? {
29        Some(range) => range,
30        None => {
31            trace!(target: "pruner", "No receipts to prune");
32            return Ok(SegmentOutput::done())
33        }
34    };
35    let tx_range_end = *tx_range.end();
36
37    let mut limiter = input.limiter;
38
39    let mut last_pruned_transaction = tx_range_end;
40    let (pruned, done) = provider.tx_ref().prune_table_with_range::<tables::Receipts<
41        <Provider::Primitives as NodePrimitives>::Receipt,
42    >>(
43        tx_range,
44        &mut limiter,
45        |_| false,
46        |row| last_pruned_transaction = row.0,
47    )?;
48    trace!(target: "pruner", %pruned, %done, "Pruned receipts");
49
50    let last_pruned_block = provider
51        .transaction_block(last_pruned_transaction)?
52        .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
53        // If there's more receipts to prune, set the checkpoint block number to previous,
54        // so we could finish pruning its receipts on the next run.
55        .checked_sub(if done { 0 } else { 1 });
56
57    let progress = limiter.progress(done);
58
59    Ok(SegmentOutput {
60        progress,
61        pruned,
62        checkpoint: Some(SegmentOutputCheckpoint {
63            block_number: last_pruned_block,
64            tx_number: Some(last_pruned_transaction),
65        }),
66    })
67}
68
69pub(crate) fn save_checkpoint(
70    provider: impl PruneCheckpointWriter,
71    checkpoint: PruneCheckpoint,
72) -> ProviderResult<()> {
73    provider.save_prune_checkpoint(PruneSegment::Receipts, checkpoint)?;
74
75    // `PruneSegment::Receipts` overrides `PruneSegment::ContractLogs`, so we can preemptively
76    // limit their pruning start point.
77    provider.save_prune_checkpoint(PruneSegment::ContractLogs, checkpoint)?;
78
79    Ok(())
80}
81
82#[cfg(test)]
83mod tests {
84    use crate::segments::{PruneInput, PruneLimiter, SegmentOutput};
85    use alloy_primitives::{BlockNumber, TxNumber, B256};
86    use assert_matches::assert_matches;
87    use itertools::{
88        FoldWhile::{Continue, Done},
89        Itertools,
90    };
91    use reth_db_api::tables;
92    use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader};
93    use reth_prune_types::{
94        PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
95    };
96    use reth_stages::test_utils::{StorageKind, TestStageDB};
97    use reth_testing_utils::generators::{
98        self, random_block_range, random_receipt, BlockRangeParams,
99    };
100    use std::ops::Sub;
101
102    #[test]
103    fn prune() {
104        let db = TestStageDB::default();
105        let mut rng = generators::rng();
106
107        let blocks = random_block_range(
108            &mut rng,
109            1..=10,
110            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
111        );
112        db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
113
114        let mut receipts = Vec::new();
115        for block in &blocks {
116            receipts.reserve_exact(block.transaction_count());
117            for transaction in &block.body().transactions {
118                receipts.push((
119                    receipts.len() as u64,
120                    random_receipt(&mut rng, transaction, Some(0), None),
121                ));
122            }
123        }
124        let receipts_len = receipts.len();
125        db.insert_receipts(receipts).expect("insert receipts");
126
127        assert_eq!(
128            db.table::<tables::Transactions>().unwrap().len(),
129            blocks.iter().map(|block| block.transaction_count()).sum::<usize>()
130        );
131        assert_eq!(
132            db.table::<tables::Transactions>().unwrap().len(),
133            db.table::<tables::Receipts>().unwrap().len()
134        );
135
136        let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| {
137            let prune_mode = PruneMode::Before(to_block);
138            let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10);
139            let input = PruneInput {
140                previous_checkpoint: db
141                    .factory
142                    .provider()
143                    .unwrap()
144                    .get_prune_checkpoint(PruneSegment::Receipts)
145                    .unwrap(),
146                to_block,
147                limiter: limiter.clone(),
148            };
149
150            let next_tx_number_to_prune = db
151                .factory
152                .provider()
153                .unwrap()
154                .get_prune_checkpoint(PruneSegment::Receipts)
155                .unwrap()
156                .and_then(|checkpoint| checkpoint.tx_number)
157                .map(|tx_number| tx_number + 1)
158                .unwrap_or_default();
159
160            let last_pruned_tx_number = blocks
161                .iter()
162                .take(to_block as usize)
163                .map(|block| block.transaction_count())
164                .sum::<usize>()
165                .min(
166                    next_tx_number_to_prune as usize +
167                        input.limiter.deleted_entries_limit().unwrap(),
168                )
169                .sub(1);
170
171            let provider = db.factory.database_provider_rw().unwrap();
172            let result = super::prune(&provider, input).unwrap();
173            limiter.increment_deleted_entries_count_by(result.pruned);
174
175            assert_matches!(
176                result,
177                SegmentOutput {progress, pruned, checkpoint: Some(_)}
178                    if (progress, pruned) == expected_result
179            );
180
181            super::save_checkpoint(
182                &provider,
183                result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
184            )
185            .unwrap();
186            provider.commit().expect("commit");
187
188            let last_pruned_block_number = blocks
189                .iter()
190                .fold_while((0, 0), |(_, mut tx_count), block| {
191                    tx_count += block.transaction_count();
192
193                    if tx_count > last_pruned_tx_number {
194                        Done((block.number, tx_count))
195                    } else {
196                        Continue((block.number, tx_count))
197                    }
198                })
199                .into_inner()
200                .0
201                .checked_sub(if result.progress.is_finished() { 0 } else { 1 });
202
203            assert_eq!(
204                db.table::<tables::Receipts>().unwrap().len(),
205                receipts_len - (last_pruned_tx_number + 1)
206            );
207            assert_eq!(
208                db.factory
209                    .provider()
210                    .unwrap()
211                    .get_prune_checkpoint(PruneSegment::Receipts)
212                    .unwrap(),
213                Some(PruneCheckpoint {
214                    block_number: last_pruned_block_number,
215                    tx_number: Some(last_pruned_tx_number as TxNumber),
216                    prune_mode
217                })
218            );
219        };
220
221        test_prune(
222            6,
223            (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 10),
224        );
225        test_prune(6, (PruneProgress::Finished, 2));
226        test_prune(10, (PruneProgress::Finished, 8));
227    }
228}