reth_prune/segments/
receipts.rs

1//! Common receipts pruning logic.
2//!
3//! - [`crate::segments::user::Receipts`] is responsible for pruning receipts according to the
4//!   user-configured settings (for example, on a full node or with a custom prune config)
5
6use crate::{db_ext::DbTxPruneExt, segments::PruneInput, PrunerError};
7use reth_db_api::{table::Value, tables, transaction::DbTxMut};
8use reth_primitives_traits::NodePrimitives;
9use reth_provider::{
10    errors::provider::ProviderResult, BlockReader, DBProvider, NodePrimitivesProvider,
11    PruneCheckpointWriter, TransactionsProvider,
12};
13use reth_prune_types::{PruneCheckpoint, PruneSegment, SegmentOutput, SegmentOutputCheckpoint};
14use tracing::trace;
15
16pub(crate) fn prune<Provider>(
17    provider: &Provider,
18    input: PruneInput,
19) -> Result<SegmentOutput, PrunerError>
20where
21    Provider: DBProvider<Tx: DbTxMut>
22        + TransactionsProvider
23        + BlockReader
24        + NodePrimitivesProvider<Primitives: NodePrimitives<Receipt: Value>>,
25{
26    let tx_range = match input.get_next_tx_num_range(provider)? {
27        Some(range) => range,
28        None => {
29            trace!(target: "pruner", "No receipts to prune");
30            return Ok(SegmentOutput::done())
31        }
32    };
33    let tx_range_end = *tx_range.end();
34
35    let mut limiter = input.limiter;
36
37    let mut last_pruned_transaction = tx_range_end;
38    let (pruned, done) = provider.tx_ref().prune_table_with_range::<tables::Receipts<
39        <Provider::Primitives as NodePrimitives>::Receipt,
40    >>(
41        tx_range,
42        &mut limiter,
43        |_| false,
44        |row| last_pruned_transaction = row.0,
45    )?;
46    trace!(target: "pruner", %pruned, %done, "Pruned receipts");
47
48    let last_pruned_block = provider
49        .transaction_block(last_pruned_transaction)?
50        .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
51        // If there's more receipts to prune, set the checkpoint block number to previous,
52        // so we could finish pruning its receipts on the next run.
53        .checked_sub(if done { 0 } else { 1 });
54
55    let progress = limiter.progress(done);
56
57    Ok(SegmentOutput {
58        progress,
59        pruned,
60        checkpoint: Some(SegmentOutputCheckpoint {
61            block_number: last_pruned_block,
62            tx_number: Some(last_pruned_transaction),
63        }),
64    })
65}
66
67pub(crate) fn save_checkpoint(
68    provider: impl PruneCheckpointWriter,
69    checkpoint: PruneCheckpoint,
70) -> ProviderResult<()> {
71    provider.save_prune_checkpoint(PruneSegment::Receipts, checkpoint)?;
72
73    // `PruneSegment::Receipts` overrides `PruneSegment::ContractLogs`, so we can preemptively
74    // limit their pruning start point.
75    provider.save_prune_checkpoint(PruneSegment::ContractLogs, checkpoint)?;
76
77    Ok(())
78}
79
80#[cfg(test)]
81mod tests {
82    use crate::segments::{PruneInput, PruneLimiter, SegmentOutput};
83    use alloy_primitives::{BlockNumber, TxNumber, B256};
84    use assert_matches::assert_matches;
85    use itertools::{
86        FoldWhile::{Continue, Done},
87        Itertools,
88    };
89    use reth_db_api::tables;
90    use reth_provider::{DBProvider, DatabaseProviderFactory, PruneCheckpointReader};
91    use reth_prune_types::{
92        PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
93    };
94    use reth_stages::test_utils::{StorageKind, TestStageDB};
95    use reth_testing_utils::generators::{
96        self, random_block_range, random_receipt, BlockRangeParams,
97    };
98    use std::ops::Sub;
99
100    #[test]
101    fn prune() {
102        let db = TestStageDB::default();
103        let mut rng = generators::rng();
104
105        let blocks = random_block_range(
106            &mut rng,
107            1..=10,
108            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
109        );
110        db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
111
112        let mut receipts = Vec::new();
113        for block in &blocks {
114            receipts.reserve_exact(block.transaction_count());
115            for transaction in &block.body().transactions {
116                receipts.push((
117                    receipts.len() as u64,
118                    random_receipt(&mut rng, transaction, Some(0), None),
119                ));
120            }
121        }
122        let receipts_len = receipts.len();
123        db.insert_receipts(receipts).expect("insert receipts");
124
125        assert_eq!(
126            db.table::<tables::Transactions>().unwrap().len(),
127            blocks.iter().map(|block| block.transaction_count()).sum::<usize>()
128        );
129        assert_eq!(
130            db.table::<tables::Transactions>().unwrap().len(),
131            db.table::<tables::Receipts>().unwrap().len()
132        );
133
134        let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| {
135            let prune_mode = PruneMode::Before(to_block);
136            let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10);
137            let input = PruneInput {
138                previous_checkpoint: db
139                    .factory
140                    .provider()
141                    .unwrap()
142                    .get_prune_checkpoint(PruneSegment::Receipts)
143                    .unwrap(),
144                to_block,
145                limiter: limiter.clone(),
146            };
147
148            let next_tx_number_to_prune = db
149                .factory
150                .provider()
151                .unwrap()
152                .get_prune_checkpoint(PruneSegment::Receipts)
153                .unwrap()
154                .and_then(|checkpoint| checkpoint.tx_number)
155                .map(|tx_number| tx_number + 1)
156                .unwrap_or_default();
157
158            let last_pruned_tx_number = blocks
159                .iter()
160                .take(to_block as usize)
161                .map(|block| block.transaction_count())
162                .sum::<usize>()
163                .min(
164                    next_tx_number_to_prune as usize +
165                        input.limiter.deleted_entries_limit().unwrap(),
166                )
167                .sub(1);
168
169            let provider = db.factory.database_provider_rw().unwrap();
170            let result = super::prune(&provider, input).unwrap();
171            limiter.increment_deleted_entries_count_by(result.pruned);
172
173            assert_matches!(
174                result,
175                SegmentOutput {progress, pruned, checkpoint: Some(_)}
176                    if (progress, pruned) == expected_result
177            );
178
179            super::save_checkpoint(
180                &provider,
181                result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
182            )
183            .unwrap();
184            provider.commit().expect("commit");
185
186            let last_pruned_block_number = blocks
187                .iter()
188                .fold_while((0, 0), |(_, mut tx_count), block| {
189                    tx_count += block.transaction_count();
190
191                    if tx_count > last_pruned_tx_number {
192                        Done((block.number, tx_count))
193                    } else {
194                        Continue((block.number, tx_count))
195                    }
196                })
197                .into_inner()
198                .0
199                .checked_sub(if result.progress.is_finished() { 0 } else { 1 });
200
201            assert_eq!(
202                db.table::<tables::Receipts>().unwrap().len(),
203                receipts_len - (last_pruned_tx_number + 1)
204            );
205            assert_eq!(
206                db.factory
207                    .provider()
208                    .unwrap()
209                    .get_prune_checkpoint(PruneSegment::Receipts)
210                    .unwrap(),
211                Some(PruneCheckpoint {
212                    block_number: last_pruned_block_number,
213                    tx_number: Some(last_pruned_tx_number as TxNumber),
214                    prune_mode
215                })
216            );
217        };
218
219        test_prune(
220            6,
221            (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 10),
222        );
223        test_prune(6, (PruneProgress::Finished, 2));
224        test_prune(10, (PruneProgress::Finished, 8));
225    }
226}