reth_prune/segments/user/
receipts_by_logs.rs

1use crate::{
2    db_ext::DbTxPruneExt,
3    segments::{PruneInput, Segment},
4    PrunerError,
5};
6use alloy_consensus::TxReceipt;
7use reth_db_api::{table::Value, tables, transaction::DbTxMut};
8use reth_primitives_traits::NodePrimitives;
9use reth_provider::{
10    BlockReader, DBProvider, NodePrimitivesProvider, PruneCheckpointWriter, TransactionsProvider,
11};
12use reth_prune_types::{
13    PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment, ReceiptsLogPruneConfig, SegmentOutput,
14    MINIMUM_PRUNING_DISTANCE,
15};
16use tracing::{instrument, trace};
17#[derive(Debug)]
18pub struct ReceiptsByLogs {
19    config: ReceiptsLogPruneConfig,
20}
21
22impl ReceiptsByLogs {
23    pub const fn new(config: ReceiptsLogPruneConfig) -> Self {
24        Self { config }
25    }
26}
27
28impl<Provider> Segment<Provider> for ReceiptsByLogs
29where
30    Provider: DBProvider<Tx: DbTxMut>
31        + PruneCheckpointWriter
32        + TransactionsProvider
33        + BlockReader
34        + NodePrimitivesProvider<Primitives: NodePrimitives<Receipt: Value>>,
35{
36    fn segment(&self) -> PruneSegment {
37        PruneSegment::ContractLogs
38    }
39
40    fn mode(&self) -> Option<PruneMode> {
41        None
42    }
43
44    fn purpose(&self) -> PrunePurpose {
45        PrunePurpose::User
46    }
47
48    #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
49    fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
50        // Contract log filtering removes every receipt possible except the ones in the list. So,
51        // for the other receipts it's as if they had a `PruneMode::Distance()` of
52        // `MINIMUM_PRUNING_DISTANCE`.
53        let to_block = PruneMode::Distance(MINIMUM_PRUNING_DISTANCE)
54            .prune_target_block(input.to_block, PruneSegment::ContractLogs, PrunePurpose::User)?
55            .map(|(bn, _)| bn)
56            .unwrap_or_default();
57
58        // Get status checkpoint from latest run
59        let mut last_pruned_block =
60            input.previous_checkpoint.and_then(|checkpoint| checkpoint.block_number);
61
62        let initial_last_pruned_block = last_pruned_block;
63
64        let mut from_tx_number = match initial_last_pruned_block {
65            Some(block) => provider
66                .block_body_indices(block)?
67                .map(|block| block.last_tx_num() + 1)
68                .unwrap_or(0),
69            None => 0,
70        };
71
72        // Figure out what receipts have already been pruned, so we can have an accurate
73        // `address_filter`
74        let address_filter = self.config.group_by_block(input.to_block, last_pruned_block)?;
75
76        // Splits all transactions in different block ranges. Each block range will have its own
77        // filter address list and will check it while going through the table
78        //
79        // Example:
80        // For an `address_filter` such as:
81        // { block9: [a1, a2], block20: [a3, a4, a5] }
82        //
83        // The following structures will be created in the exact order as showed:
84        // `block_ranges`: [
85        //    (block0, block8, 0 addresses),
86        //    (block9, block19, 2 addresses),
87        //    (block20, to_block, 5 addresses)
88        //  ]
89        // `filtered_addresses`: [a1, a2, a3, a4, a5]
90        //
91        // The first range will delete all receipts between block0 - block8
92        // The second range will delete all receipts between block9 - 19, except the ones with
93        //     emitter logs from these addresses: [a1, a2].
94        // The third range will delete all receipts between block20 - to_block, except the ones with
95        //     emitter logs from these addresses: [a1, a2, a3, a4, a5]
96        let mut block_ranges = vec![];
97        let mut blocks_iter = address_filter.iter().peekable();
98        let mut filtered_addresses = vec![];
99
100        while let Some((start_block, addresses)) = blocks_iter.next() {
101            filtered_addresses.extend_from_slice(addresses);
102
103            // This will clear all receipts before the first  appearance of a contract log or since
104            // the block after the last pruned one.
105            if block_ranges.is_empty() {
106                let init = last_pruned_block.map(|b| b + 1).unwrap_or_default();
107                if init < *start_block {
108                    block_ranges.push((init, *start_block - 1, 0));
109                }
110            }
111
112            let end_block =
113                blocks_iter.peek().map(|(next_block, _)| *next_block - 1).unwrap_or(to_block);
114
115            // Addresses in lower block ranges, are still included in the inclusion list for future
116            // ranges.
117            block_ranges.push((*start_block, end_block, filtered_addresses.len()));
118        }
119
120        trace!(
121            target: "pruner",
122            ?block_ranges,
123            ?filtered_addresses,
124            "Calculated block ranges and filtered addresses",
125        );
126
127        let mut limiter = input.limiter;
128
129        let mut done = true;
130        let mut pruned = 0;
131        let mut last_pruned_transaction = None;
132        for (start_block, end_block, num_addresses) in block_ranges {
133            let block_range = start_block..=end_block;
134
135            // Calculate the transaction range from this block range
136            let tx_range_end = match provider.block_body_indices(end_block)? {
137                Some(body) => body.last_tx_num(),
138                None => {
139                    trace!(
140                        target: "pruner",
141                        ?block_range,
142                        "No receipts to prune."
143                    );
144                    continue
145                }
146            };
147            let tx_range = from_tx_number..=tx_range_end;
148
149            // Delete receipts, except the ones in the inclusion list
150            let mut last_skipped_transaction = 0;
151            let deleted;
152            (deleted, done) = provider.tx_ref().prune_table_with_range::<tables::Receipts<
153                <Provider::Primitives as NodePrimitives>::Receipt,
154            >>(
155                tx_range,
156                &mut limiter,
157                |(tx_num, receipt)| {
158                    let skip = num_addresses > 0 &&
159                        receipt.logs().iter().any(|log| {
160                            filtered_addresses[..num_addresses].contains(&&log.address)
161                        });
162
163                    if skip {
164                        last_skipped_transaction = *tx_num;
165                    }
166                    skip
167                },
168                |row| last_pruned_transaction = Some(row.0),
169            )?;
170
171            trace!(target: "pruner", %deleted, %done, ?block_range, "Pruned receipts");
172
173            pruned += deleted;
174
175            // For accurate checkpoints we need to know that we have checked every transaction.
176            // Example: we reached the end of the range, and the last receipt is supposed to skip
177            // its deletion.
178            let last_pruned_transaction = *last_pruned_transaction
179                .insert(last_pruned_transaction.unwrap_or_default().max(last_skipped_transaction));
180
181            last_pruned_block = Some(
182                provider
183                    .transaction_block(last_pruned_transaction)?
184                    .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
185                    // If there's more receipts to prune, set the checkpoint block number to
186                    // previous, so we could finish pruning its receipts on the
187                    // next run.
188                    .saturating_sub(if done { 0 } else { 1 }),
189            );
190
191            if limiter.is_limit_reached() {
192                done &= end_block == to_block;
193                break
194            }
195
196            from_tx_number = last_pruned_transaction + 1;
197        }
198
199        // If there are contracts using `PruneMode::Distance(_)` there will be receipts before
200        // `to_block` that become eligible to be pruned in future runs. Therefore, our checkpoint is
201        // not actually `to_block`, but the `lowest_block_with_distance` from any contract.
202        // This ensures that in future pruner runs we can prune all these receipts between the
203        // previous `lowest_block_with_distance` and the new one using
204        // `get_next_tx_num_range_from_checkpoint`.
205        //
206        // Only applies if we were able to prune everything intended for this run, otherwise the
207        // checkpoint is the `last_pruned_block`.
208        let prune_mode_block = self
209            .config
210            .lowest_block_with_distance(input.to_block, initial_last_pruned_block)?
211            .unwrap_or(to_block);
212
213        provider.save_prune_checkpoint(
214            PruneSegment::ContractLogs,
215            PruneCheckpoint {
216                block_number: Some(prune_mode_block.min(last_pruned_block.unwrap_or(u64::MAX))),
217                tx_number: last_pruned_transaction,
218                prune_mode: PruneMode::Before(prune_mode_block),
219            },
220        )?;
221
222        let progress = limiter.progress(done);
223
224        Ok(SegmentOutput { progress, pruned, checkpoint: None })
225    }
226}
227
228#[cfg(test)]
229mod tests {
230    use crate::segments::{PruneInput, PruneLimiter, ReceiptsByLogs, Segment};
231    use alloy_primitives::B256;
232    use assert_matches::assert_matches;
233    use reth_db_api::{cursor::DbCursorRO, tables, transaction::DbTx};
234    use reth_primitives_traits::InMemorySize;
235    use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader, TransactionsProvider};
236    use reth_prune_types::{PruneMode, PruneSegment, ReceiptsLogPruneConfig};
237    use reth_stages::test_utils::{StorageKind, TestStageDB};
238    use reth_testing_utils::generators::{
239        self, random_block_range, random_eoa_account, random_log, random_receipt, BlockRangeParams,
240    };
241    use std::collections::BTreeMap;
242
243    #[test]
244    fn prune_receipts_by_logs() {
245        reth_tracing::init_test_tracing();
246
247        let db = TestStageDB::default();
248        let mut rng = generators::rng();
249
250        let tip = 20000;
251        let blocks = [
252            random_block_range(
253                &mut rng,
254                0..=100,
255                BlockRangeParams { parent: Some(B256::ZERO), tx_count: 1..5, ..Default::default() },
256            ),
257            random_block_range(
258                &mut rng,
259                (100 + 1)..=(tip - 100),
260                BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
261            ),
262            random_block_range(
263                &mut rng,
264                (tip - 100 + 1)..=tip,
265                BlockRangeParams { parent: Some(B256::ZERO), tx_count: 1..5, ..Default::default() },
266            ),
267        ]
268        .concat();
269        db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
270
271        let mut receipts = Vec::new();
272
273        let (deposit_contract_addr, _) = random_eoa_account(&mut rng);
274        for block in &blocks {
275            receipts.reserve_exact(block.body().size());
276            for (txi, transaction) in block.body().transactions.iter().enumerate() {
277                let mut receipt = random_receipt(&mut rng, transaction, Some(1));
278                receipt.logs.push(random_log(
279                    &mut rng,
280                    (txi == (block.transaction_count() - 1)).then_some(deposit_contract_addr),
281                    Some(1),
282                ));
283                receipts.push((receipts.len() as u64, receipt));
284            }
285        }
286        db.insert_receipts(receipts).expect("insert receipts");
287
288        assert_eq!(
289            db.table::<tables::Transactions>().unwrap().len(),
290            blocks.iter().map(|block| block.transaction_count()).sum::<usize>()
291        );
292        assert_eq!(
293            db.table::<tables::Transactions>().unwrap().len(),
294            db.table::<tables::Receipts>().unwrap().len()
295        );
296
297        let run_prune = || {
298            let provider = db.factory.database_provider_rw().unwrap();
299
300            let prune_before_block: usize = 20;
301            let prune_mode = PruneMode::Before(prune_before_block as u64);
302            let receipts_log_filter =
303                ReceiptsLogPruneConfig(BTreeMap::from([(deposit_contract_addr, prune_mode)]));
304
305            let limiter = PruneLimiter::default().set_deleted_entries_limit(10);
306
307            let result = ReceiptsByLogs::new(receipts_log_filter).prune(
308                &provider,
309                PruneInput {
310                    previous_checkpoint: db
311                        .factory
312                        .provider()
313                        .unwrap()
314                        .get_prune_checkpoint(PruneSegment::ContractLogs)
315                        .unwrap(),
316                    to_block: tip,
317                    limiter,
318                },
319            );
320            provider.commit().expect("commit");
321
322            assert_matches!(result, Ok(_));
323            let output = result.unwrap();
324
325            let (pruned_block, pruned_tx) = db
326                .factory
327                .provider()
328                .unwrap()
329                .get_prune_checkpoint(PruneSegment::ContractLogs)
330                .unwrap()
331                .map(|checkpoint| (checkpoint.block_number.unwrap(), checkpoint.tx_number.unwrap()))
332                .unwrap_or_default();
333
334            // All receipts are in the end of the block
335            let unprunable = pruned_block.saturating_sub(prune_before_block as u64 - 1);
336
337            assert_eq!(
338                db.table::<tables::Receipts>().unwrap().len(),
339                blocks.iter().map(|block| block.transaction_count()).sum::<usize>() -
340                    ((pruned_tx + 1) - unprunable) as usize
341            );
342
343            output.progress.is_finished()
344        };
345
346        while !run_prune() {}
347
348        let provider = db.factory.provider().unwrap();
349        let mut cursor = provider.tx_ref().cursor_read::<tables::Receipts>().unwrap();
350        let walker = cursor.walk(None).unwrap();
351        for receipt in walker {
352            let (tx_num, receipt) = receipt.unwrap();
353
354            // Either we only find our contract, or the receipt is part of the unprunable receipts
355            // set by tip - 128
356            assert!(
357                receipt.logs.iter().any(|l| l.address == deposit_contract_addr) ||
358                    provider.transaction_block(tx_num).unwrap().unwrap() > tip - 128,
359            );
360        }
361    }
362}