Skip to main content

reth_prune/segments/user/
receipts_by_logs.rs

1use crate::{
2    db_ext::DbTxPruneExt,
3    segments::{PruneInput, Segment},
4    PrunerError,
5};
6use alloy_consensus::TxReceipt;
7use reth_db_api::{table::Value, tables, transaction::DbTxMut};
8use reth_primitives_traits::NodePrimitives;
9use reth_provider::{
10    BlockReader, DBProvider, NodePrimitivesProvider, PruneCheckpointWriter, TransactionsProvider,
11};
12use reth_prune_types::{
13    PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment, ReceiptsLogPruneConfig, SegmentOutput,
14    MINIMUM_UNWIND_SAFE_DISTANCE,
15};
16use tracing::{instrument, trace};
17#[derive(Debug)]
18pub struct ReceiptsByLogs {
19    config: ReceiptsLogPruneConfig,
20}
21
22impl ReceiptsByLogs {
23    pub const fn new(config: ReceiptsLogPruneConfig) -> Self {
24        Self { config }
25    }
26}
27
28impl<Provider> Segment<Provider> for ReceiptsByLogs
29where
30    Provider: DBProvider<Tx: DbTxMut>
31        + PruneCheckpointWriter
32        + TransactionsProvider
33        + BlockReader
34        + NodePrimitivesProvider<Primitives: NodePrimitives<Receipt: Value>>,
35{
36    fn segment(&self) -> PruneSegment {
37        PruneSegment::ContractLogs
38    }
39
40    fn mode(&self) -> Option<PruneMode> {
41        None
42    }
43
44    fn purpose(&self) -> PrunePurpose {
45        PrunePurpose::User
46    }
47
48    #[instrument(
49        name = "ReceiptsByLogs::prune",
50        target = "pruner",
51        skip(self, provider),
52        ret(level = "trace")
53    )]
54    fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
55        // Contract log filtering removes every receipt possible except the ones in the list. So,
56        // for the other receipts it's as if they had a `PruneMode::Distance()` of
57        // `MINIMUM_UNWIND_SAFE_DISTANCE`.
58        let to_block = PruneMode::Distance(MINIMUM_UNWIND_SAFE_DISTANCE)
59            .prune_target_block(input.to_block, PruneSegment::ContractLogs, PrunePurpose::User)?
60            .map(|(bn, _)| bn)
61            .unwrap_or_default();
62
63        // Get status checkpoint from latest run
64        let mut last_pruned_block =
65            input.previous_checkpoint.and_then(|checkpoint| checkpoint.block_number);
66
67        let initial_last_pruned_block = last_pruned_block;
68
69        let mut from_tx_number = match initial_last_pruned_block {
70            Some(block) => provider
71                .block_body_indices(block)?
72                .map(|block| block.last_tx_num() + 1)
73                .unwrap_or(0),
74            None => 0,
75        };
76
77        // Figure out what receipts have already been pruned, so we can have an accurate
78        // `address_filter`
79        let address_filter = self.config.group_by_block(input.to_block, last_pruned_block)?;
80
81        // Splits all transactions in different block ranges. Each block range will have its own
82        // filter address list and will check it while going through the table
83        //
84        // Example:
85        // For an `address_filter` such as:
86        // { block9: [a1, a2], block20: [a3, a4, a5] }
87        //
88        // The following structures will be created in the exact order as showed:
89        // `block_ranges`: [
90        //    (block0, block8, 0 addresses),
91        //    (block9, block19, 2 addresses),
92        //    (block20, to_block, 5 addresses)
93        //  ]
94        // `filtered_addresses`: [a1, a2, a3, a4, a5]
95        //
96        // The first range will delete all receipts between block0 - block8
97        // The second range will delete all receipts between block9 - 19, except the ones with
98        //     emitter logs from these addresses: [a1, a2].
99        // The third range will delete all receipts between block20 - to_block, except the ones with
100        //     emitter logs from these addresses: [a1, a2, a3, a4, a5]
101        let mut block_ranges = vec![];
102        let mut blocks_iter = address_filter.iter().peekable();
103        let mut filtered_addresses = vec![];
104
105        while let Some((start_block, addresses)) = blocks_iter.next() {
106            filtered_addresses.extend_from_slice(addresses);
107
108            // This will clear all receipts before the first  appearance of a contract log or since
109            // the block after the last pruned one.
110            if block_ranges.is_empty() {
111                let init = last_pruned_block.map(|b| b + 1).unwrap_or_default();
112                if init < *start_block {
113                    block_ranges.push((init, *start_block - 1, 0));
114                }
115            }
116
117            let end_block =
118                blocks_iter.peek().map(|(next_block, _)| *next_block - 1).unwrap_or(to_block);
119
120            // Addresses in lower block ranges, are still included in the inclusion list for future
121            // ranges.
122            block_ranges.push((*start_block, end_block, filtered_addresses.len()));
123        }
124
125        trace!(
126            target: "pruner",
127            ?block_ranges,
128            ?filtered_addresses,
129            "Calculated block ranges and filtered addresses",
130        );
131
132        let mut limiter = input.limiter;
133
134        let mut done = true;
135        let mut pruned = 0;
136        let mut last_pruned_transaction = None;
137        for (start_block, end_block, num_addresses) in block_ranges {
138            let block_range = start_block..=end_block;
139
140            // Calculate the transaction range from this block range
141            let tx_range_end = match provider.block_body_indices(end_block)? {
142                Some(body) => body.last_tx_num(),
143                None => {
144                    trace!(
145                        target: "pruner",
146                        ?block_range,
147                        "No receipts to prune."
148                    );
149                    continue
150                }
151            };
152            let tx_range = from_tx_number..=tx_range_end;
153
154            // Delete receipts, except the ones in the inclusion list
155            let mut last_skipped_transaction = 0;
156            let deleted;
157            (deleted, done) = provider.tx_ref().prune_table_with_range::<tables::Receipts<
158                <Provider::Primitives as NodePrimitives>::Receipt,
159            >>(
160                tx_range,
161                &mut limiter,
162                |(tx_num, receipt)| {
163                    let skip = num_addresses > 0 &&
164                        receipt.logs().iter().any(|log| {
165                            filtered_addresses[..num_addresses].contains(&&log.address)
166                        });
167
168                    if skip {
169                        last_skipped_transaction = *tx_num;
170                    }
171                    skip
172                },
173                |row| last_pruned_transaction = Some(row.0),
174            )?;
175
176            trace!(target: "pruner", %deleted, %done, ?block_range, "Pruned receipts");
177
178            pruned += deleted;
179
180            // For accurate checkpoints we need to know that we have checked every transaction.
181            // Example: we reached the end of the range, and the last receipt is supposed to skip
182            // its deletion.
183            let last_pruned_transaction = *last_pruned_transaction
184                .insert(last_pruned_transaction.unwrap_or_default().max(last_skipped_transaction));
185
186            last_pruned_block = Some(
187                provider
188                    .block_by_transaction_id(last_pruned_transaction)?
189                    .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
190                    // If there's more receipts to prune, set the checkpoint block number to
191                    // previous, so we could finish pruning its receipts on the
192                    // next run.
193                    .saturating_sub(if done { 0 } else { 1 }),
194            );
195
196            if limiter.is_limit_reached() {
197                done &= end_block == to_block;
198                break
199            }
200
201            from_tx_number = last_pruned_transaction + 1;
202        }
203
204        // If there are contracts using `PruneMode::Distance(_)` there will be receipts before
205        // `to_block` that become eligible to be pruned in future runs. Therefore, our checkpoint is
206        // not actually `to_block`, but the `lowest_block_with_distance` from any contract.
207        // This ensures that in future pruner runs we can prune all these receipts between the
208        // previous `lowest_block_with_distance` and the new one using
209        // `get_next_tx_num_range_from_checkpoint`.
210        //
211        // Only applies if we were able to prune everything intended for this run, otherwise the
212        // checkpoint is the `last_pruned_block`.
213        let prune_mode_block = self
214            .config
215            .lowest_block_with_distance(input.to_block, initial_last_pruned_block)?
216            .unwrap_or(to_block);
217
218        provider.save_prune_checkpoint(
219            PruneSegment::ContractLogs,
220            PruneCheckpoint {
221                block_number: Some(prune_mode_block.min(last_pruned_block.unwrap_or(u64::MAX))),
222                tx_number: last_pruned_transaction,
223                prune_mode: PruneMode::Before(prune_mode_block),
224            },
225        )?;
226
227        let progress = limiter.progress(done);
228
229        Ok(SegmentOutput { progress, pruned, checkpoint: None })
230    }
231}
232
233#[cfg(test)]
234mod tests {
235    use crate::segments::{user::ReceiptsByLogs, PruneInput, PruneLimiter, Segment};
236    use alloy_primitives::B256;
237    use assert_matches::assert_matches;
238    use reth_db_api::{cursor::DbCursorRO, tables, transaction::DbTx};
239    use reth_primitives_traits::InMemorySize;
240    use reth_provider::{BlockReader, DBProvider, DatabaseProviderFactory, PruneCheckpointReader};
241    use reth_prune_types::{PruneMode, PruneSegment, ReceiptsLogPruneConfig};
242    use reth_stages::test_utils::{StorageKind, TestStageDB};
243    use reth_testing_utils::generators::{
244        self, random_block_range, random_eoa_account, random_log, random_receipt, BlockRangeParams,
245    };
246    use std::collections::BTreeMap;
247
248    #[test]
249    fn prune_receipts_by_logs() {
250        reth_tracing::init_test_tracing();
251
252        let db = TestStageDB::default();
253        let mut rng = generators::rng();
254
255        let tip = 20000;
256        let blocks = [
257            random_block_range(
258                &mut rng,
259                0..=100,
260                BlockRangeParams { parent: Some(B256::ZERO), tx_count: 1..5, ..Default::default() },
261            ),
262            random_block_range(
263                &mut rng,
264                (100 + 1)..=(tip - 100),
265                BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
266            ),
267            random_block_range(
268                &mut rng,
269                (tip - 100 + 1)..=tip,
270                BlockRangeParams { parent: Some(B256::ZERO), tx_count: 1..5, ..Default::default() },
271            ),
272        ]
273        .concat();
274        db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
275
276        let mut receipts = Vec::new();
277
278        let (deposit_contract_addr, _) = random_eoa_account(&mut rng);
279        for block in &blocks {
280            receipts.reserve_exact(block.body().size());
281            for (txi, transaction) in block.body().transactions.iter().enumerate() {
282                let mut receipt = random_receipt(&mut rng, transaction, Some(1), None);
283                receipt.logs.push(random_log(
284                    &mut rng,
285                    (txi == (block.transaction_count() - 1)).then_some(deposit_contract_addr),
286                    Some(1),
287                ));
288                receipts.push((receipts.len() as u64, receipt));
289            }
290        }
291        db.insert_receipts(receipts).expect("insert receipts");
292
293        assert_eq!(
294            db.table::<tables::Transactions>().unwrap().len(),
295            blocks.iter().map(|block| block.transaction_count()).sum::<usize>()
296        );
297        assert_eq!(
298            db.table::<tables::Transactions>().unwrap().len(),
299            db.table::<tables::Receipts>().unwrap().len()
300        );
301
302        let run_prune = || {
303            let provider = db.factory.database_provider_rw().unwrap();
304
305            let prune_before_block: usize = 20;
306            let prune_mode = PruneMode::Before(prune_before_block as u64);
307            let receipts_log_filter =
308                ReceiptsLogPruneConfig(BTreeMap::from([(deposit_contract_addr, prune_mode)]));
309
310            let limiter = PruneLimiter::default().set_deleted_entries_limit(10);
311
312            let result = ReceiptsByLogs::new(receipts_log_filter).prune(
313                &provider,
314                PruneInput {
315                    previous_checkpoint: db
316                        .factory
317                        .provider()
318                        .unwrap()
319                        .get_prune_checkpoint(PruneSegment::ContractLogs)
320                        .unwrap(),
321                    to_block: tip,
322                    limiter,
323                },
324            );
325            provider.commit().expect("commit");
326
327            assert_matches!(result, Ok(_));
328            let output = result.unwrap();
329
330            let (pruned_block, pruned_tx) = db
331                .factory
332                .provider()
333                .unwrap()
334                .get_prune_checkpoint(PruneSegment::ContractLogs)
335                .unwrap()
336                .map(|checkpoint| (checkpoint.block_number.unwrap(), checkpoint.tx_number.unwrap()))
337                .unwrap_or_default();
338
339            // All receipts are in the end of the block
340            let unprunable = pruned_block.saturating_sub(prune_before_block as u64 - 1);
341
342            assert_eq!(
343                db.table::<tables::Receipts>().unwrap().len(),
344                blocks.iter().map(|block| block.transaction_count()).sum::<usize>() -
345                    ((pruned_tx + 1) - unprunable) as usize
346            );
347
348            output.progress.is_finished()
349        };
350
351        while !run_prune() {}
352
353        let provider = db.factory.provider().unwrap();
354        let mut cursor = provider.tx_ref().cursor_read::<tables::Receipts>().unwrap();
355        let walker = cursor.walk(None).unwrap();
356        for receipt in walker {
357            let (tx_num, receipt) = receipt.unwrap();
358
359            // Either we only find our contract, or the receipt is part of the unprunable receipts
360            // set by tip - 128
361            assert!(
362                receipt.logs.iter().any(|l| l.address == deposit_contract_addr) ||
363                    provider.block_by_transaction_id(tx_num).unwrap().unwrap() > tip - 128,
364            );
365        }
366    }
367}