reth_prune/segments/
receipts.rs

1//! Common receipts pruning logic.
2//!
3//! - [`crate::segments::user::Receipts`] is responsible for pruning receipts according to the
4//!   user-configured settings (for example, on a full node or with a custom prune config)
5
6use crate::{
7    db_ext::DbTxPruneExt,
8    segments::{self, PruneInput},
9    PrunerError,
10};
11use reth_db_api::{table::Value, tables, transaction::DbTxMut};
12use reth_primitives_traits::NodePrimitives;
13use reth_provider::{
14    errors::provider::ProviderResult, BlockReader, DBProvider, EitherWriter,
15    NodePrimitivesProvider, PruneCheckpointWriter, StaticFileProviderFactory, StorageSettingsCache,
16    TransactionsProvider,
17};
18use reth_prune_types::{PruneCheckpoint, PruneSegment, SegmentOutput, SegmentOutputCheckpoint};
19use reth_static_file_types::StaticFileSegment;
20use tracing::{debug, trace};
21
22pub(crate) fn prune<Provider>(
23    provider: &Provider,
24    input: PruneInput,
25) -> Result<SegmentOutput, PrunerError>
26where
27    Provider: DBProvider<Tx: DbTxMut>
28        + TransactionsProvider
29        + BlockReader
30        + StorageSettingsCache
31        + StaticFileProviderFactory
32        + NodePrimitivesProvider<Primitives: NodePrimitives<Receipt: Value>>,
33{
34    if EitherWriter::receipts_destination(provider).is_static_file() {
35        debug!(target: "pruner", "Pruning receipts from static files.");
36        return segments::prune_static_files(provider, input, StaticFileSegment::Receipts)
37    }
38    debug!(target: "pruner", "Pruning receipts from database.");
39
40    // Original database implementation for when receipts are not on static files (old nodes)
41    let tx_range = match input.get_next_tx_num_range(provider)? {
42        Some(range) => range,
43        None => {
44            trace!(target: "pruner", "No receipts to prune");
45            return Ok(SegmentOutput::done())
46        }
47    };
48    let tx_range_end = *tx_range.end();
49
50    let mut limiter = input.limiter;
51
52    let mut last_pruned_transaction = tx_range_end;
53    let (pruned, done) = provider.tx_ref().prune_table_with_range::<tables::Receipts<
54        <Provider::Primitives as NodePrimitives>::Receipt,
55    >>(
56        tx_range,
57        &mut limiter,
58        |_| false,
59        |row| last_pruned_transaction = row.0,
60    )?;
61    trace!(target: "pruner", %pruned, %done, "Pruned receipts");
62
63    let last_pruned_block = provider
64        .block_by_transaction_id(last_pruned_transaction)?
65        .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
66        // If there's more receipts to prune, set the checkpoint block number to previous,
67        // so we could finish pruning its receipts on the next run.
68        .checked_sub(if done { 0 } else { 1 });
69
70    let progress = limiter.progress(done);
71
72    Ok(SegmentOutput {
73        progress,
74        pruned,
75        checkpoint: Some(SegmentOutputCheckpoint {
76            block_number: last_pruned_block,
77            tx_number: Some(last_pruned_transaction),
78        }),
79    })
80}
81
82pub(crate) fn save_checkpoint(
83    provider: impl PruneCheckpointWriter,
84    checkpoint: PruneCheckpoint,
85) -> ProviderResult<()> {
86    provider.save_prune_checkpoint(PruneSegment::Receipts, checkpoint)?;
87
88    // `PruneSegment::Receipts` overrides `PruneSegment::ContractLogs`, so we can preemptively
89    // limit their pruning start point.
90    provider.save_prune_checkpoint(PruneSegment::ContractLogs, checkpoint)?;
91
92    Ok(())
93}
94
95#[cfg(test)]
96mod tests {
97    use crate::segments::{PruneInput, PruneLimiter, SegmentOutput};
98    use alloy_primitives::{BlockNumber, TxNumber, B256};
99    use assert_matches::assert_matches;
100    use itertools::{
101        FoldWhile::{Continue, Done},
102        Itertools,
103    };
104    use reth_db_api::tables;
105    use reth_provider::{DBProvider, DatabaseProviderFactory, PruneCheckpointReader};
106    use reth_prune_types::{
107        PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
108    };
109    use reth_stages::test_utils::{StorageKind, TestStageDB};
110    use reth_testing_utils::generators::{
111        self, random_block_range, random_receipt, BlockRangeParams,
112    };
113    use std::ops::Sub;
114
115    #[test]
116    fn prune_legacy() {
117        let mut db = TestStageDB::default();
118        // Configure the factory to use database for receipts by enabling receipt pruning.
119        // This ensures EitherWriter::receipts_destination returns Database instead of StaticFile.
120        db.factory = db.factory.with_prune_modes(reth_prune_types::PruneModes {
121            receipts: Some(PruneMode::Full),
122            ..Default::default()
123        });
124        let mut rng = generators::rng();
125
126        let blocks = random_block_range(
127            &mut rng,
128            1..=10,
129            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
130        );
131        db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
132
133        let mut receipts = Vec::new();
134        for block in &blocks {
135            receipts.reserve_exact(block.transaction_count());
136            for transaction in &block.body().transactions {
137                receipts.push((
138                    receipts.len() as u64,
139                    random_receipt(&mut rng, transaction, Some(0), None),
140                ));
141            }
142        }
143        let receipts_len = receipts.len();
144        db.insert_receipts(receipts).expect("insert receipts");
145
146        assert_eq!(
147            db.table::<tables::Transactions>().unwrap().len(),
148            blocks.iter().map(|block| block.transaction_count()).sum::<usize>()
149        );
150        assert_eq!(
151            db.table::<tables::Transactions>().unwrap().len(),
152            db.table::<tables::Receipts>().unwrap().len()
153        );
154
155        let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| {
156            let prune_mode = PruneMode::Before(to_block);
157            let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10);
158            let input = PruneInput {
159                previous_checkpoint: db
160                    .factory
161                    .provider()
162                    .unwrap()
163                    .get_prune_checkpoint(PruneSegment::Receipts)
164                    .unwrap(),
165                to_block,
166                limiter: limiter.clone(),
167            };
168
169            let next_tx_number_to_prune = db
170                .factory
171                .provider()
172                .unwrap()
173                .get_prune_checkpoint(PruneSegment::Receipts)
174                .unwrap()
175                .and_then(|checkpoint| checkpoint.tx_number)
176                .map(|tx_number| tx_number + 1)
177                .unwrap_or_default();
178
179            let last_pruned_tx_number = blocks
180                .iter()
181                .take(to_block as usize)
182                .map(|block| block.transaction_count())
183                .sum::<usize>()
184                .min(
185                    next_tx_number_to_prune as usize +
186                        input.limiter.deleted_entries_limit().unwrap(),
187                )
188                .sub(1);
189
190            let provider = db.factory.database_provider_rw().unwrap();
191            let result = super::prune(&provider, input).unwrap();
192            limiter.increment_deleted_entries_count_by(result.pruned);
193
194            assert_matches!(
195                result,
196                SegmentOutput {progress, pruned, checkpoint: Some(_)}
197                    if (progress, pruned) == expected_result
198            );
199
200            super::save_checkpoint(
201                &provider,
202                result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
203            )
204            .unwrap();
205            provider.commit().expect("commit");
206
207            let last_pruned_block_number = blocks
208                .iter()
209                .fold_while((0, 0), |(_, mut tx_count), block| {
210                    tx_count += block.transaction_count();
211
212                    if tx_count > last_pruned_tx_number {
213                        Done((block.number, tx_count))
214                    } else {
215                        Continue((block.number, tx_count))
216                    }
217                })
218                .into_inner()
219                .0
220                .checked_sub(if result.progress.is_finished() { 0 } else { 1 });
221
222            assert_eq!(
223                db.table::<tables::Receipts>().unwrap().len(),
224                receipts_len - (last_pruned_tx_number + 1)
225            );
226            assert_eq!(
227                db.factory
228                    .provider()
229                    .unwrap()
230                    .get_prune_checkpoint(PruneSegment::Receipts)
231                    .unwrap(),
232                Some(PruneCheckpoint {
233                    block_number: last_pruned_block_number,
234                    tx_number: Some(last_pruned_tx_number as TxNumber),
235                    prune_mode
236                })
237            );
238        };
239
240        test_prune(
241            6,
242            (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 10),
243        );
244        test_prune(6, (PruneProgress::Finished, 2));
245        test_prune(10, (PruneProgress::Finished, 8));
246    }
247}