reth_prune/segments/user/
receipts_by_logs.rs
1use crate::{
2 db_ext::DbTxPruneExt,
3 segments::{PruneInput, Segment},
4 PrunerError,
5};
6use alloy_consensus::TxReceipt;
7use reth_db_api::{table::Value, tables, transaction::DbTxMut};
8use reth_primitives_traits::NodePrimitives;
9use reth_provider::{
10 BlockReader, DBProvider, NodePrimitivesProvider, PruneCheckpointWriter, TransactionsProvider,
11};
12use reth_prune_types::{
13 PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment, ReceiptsLogPruneConfig, SegmentOutput,
14 MINIMUM_PRUNING_DISTANCE,
15};
16use tracing::{instrument, trace};
17#[derive(Debug)]
18pub struct ReceiptsByLogs {
19 config: ReceiptsLogPruneConfig,
20}
21
22impl ReceiptsByLogs {
23 pub const fn new(config: ReceiptsLogPruneConfig) -> Self {
24 Self { config }
25 }
26}
27
28impl<Provider> Segment<Provider> for ReceiptsByLogs
29where
30 Provider: DBProvider<Tx: DbTxMut>
31 + PruneCheckpointWriter
32 + TransactionsProvider
33 + BlockReader
34 + NodePrimitivesProvider<Primitives: NodePrimitives<Receipt: Value>>,
35{
36 fn segment(&self) -> PruneSegment {
37 PruneSegment::ContractLogs
38 }
39
40 fn mode(&self) -> Option<PruneMode> {
41 None
42 }
43
44 fn purpose(&self) -> PrunePurpose {
45 PrunePurpose::User
46 }
47
48 #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
49 fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
50 let to_block = PruneMode::Distance(MINIMUM_PRUNING_DISTANCE)
54 .prune_target_block(input.to_block, PruneSegment::ContractLogs, PrunePurpose::User)?
55 .map(|(bn, _)| bn)
56 .unwrap_or_default();
57
58 let mut last_pruned_block =
60 input.previous_checkpoint.and_then(|checkpoint| checkpoint.block_number);
61
62 let initial_last_pruned_block = last_pruned_block;
63
64 let mut from_tx_number = match initial_last_pruned_block {
65 Some(block) => provider
66 .block_body_indices(block)?
67 .map(|block| block.last_tx_num() + 1)
68 .unwrap_or(0),
69 None => 0,
70 };
71
72 let address_filter = self.config.group_by_block(input.to_block, last_pruned_block)?;
75
76 let mut block_ranges = vec![];
97 let mut blocks_iter = address_filter.iter().peekable();
98 let mut filtered_addresses = vec![];
99
100 while let Some((start_block, addresses)) = blocks_iter.next() {
101 filtered_addresses.extend_from_slice(addresses);
102
103 if block_ranges.is_empty() {
106 let init = last_pruned_block.map(|b| b + 1).unwrap_or_default();
107 if init < *start_block {
108 block_ranges.push((init, *start_block - 1, 0));
109 }
110 }
111
112 let end_block =
113 blocks_iter.peek().map(|(next_block, _)| *next_block - 1).unwrap_or(to_block);
114
115 block_ranges.push((*start_block, end_block, filtered_addresses.len()));
118 }
119
120 trace!(
121 target: "pruner",
122 ?block_ranges,
123 ?filtered_addresses,
124 "Calculated block ranges and filtered addresses",
125 );
126
127 let mut limiter = input.limiter;
128
129 let mut done = true;
130 let mut pruned = 0;
131 let mut last_pruned_transaction = None;
132 for (start_block, end_block, num_addresses) in block_ranges {
133 let block_range = start_block..=end_block;
134
135 let tx_range_end = match provider.block_body_indices(end_block)? {
137 Some(body) => body.last_tx_num(),
138 None => {
139 trace!(
140 target: "pruner",
141 ?block_range,
142 "No receipts to prune."
143 );
144 continue
145 }
146 };
147 let tx_range = from_tx_number..=tx_range_end;
148
149 let mut last_skipped_transaction = 0;
151 let deleted;
152 (deleted, done) = provider.tx_ref().prune_table_with_range::<tables::Receipts<
153 <Provider::Primitives as NodePrimitives>::Receipt,
154 >>(
155 tx_range,
156 &mut limiter,
157 |(tx_num, receipt)| {
158 let skip = num_addresses > 0 &&
159 receipt.logs().iter().any(|log| {
160 filtered_addresses[..num_addresses].contains(&&log.address)
161 });
162
163 if skip {
164 last_skipped_transaction = *tx_num;
165 }
166 skip
167 },
168 |row| last_pruned_transaction = Some(row.0),
169 )?;
170
171 trace!(target: "pruner", %deleted, %done, ?block_range, "Pruned receipts");
172
173 pruned += deleted;
174
175 let last_pruned_transaction = *last_pruned_transaction
179 .insert(last_pruned_transaction.unwrap_or_default().max(last_skipped_transaction));
180
181 last_pruned_block = Some(
182 provider
183 .transaction_block(last_pruned_transaction)?
184 .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
185 .saturating_sub(if done { 0 } else { 1 }),
189 );
190
191 if limiter.is_limit_reached() {
192 done &= end_block == to_block;
193 break
194 }
195
196 from_tx_number = last_pruned_transaction + 1;
197 }
198
199 let prune_mode_block = self
209 .config
210 .lowest_block_with_distance(input.to_block, initial_last_pruned_block)?
211 .unwrap_or(to_block);
212
213 provider.save_prune_checkpoint(
214 PruneSegment::ContractLogs,
215 PruneCheckpoint {
216 block_number: Some(prune_mode_block.min(last_pruned_block.unwrap_or(u64::MAX))),
217 tx_number: last_pruned_transaction,
218 prune_mode: PruneMode::Before(prune_mode_block),
219 },
220 )?;
221
222 let progress = limiter.progress(done);
223
224 Ok(SegmentOutput { progress, pruned, checkpoint: None })
225 }
226}
227
228#[cfg(test)]
229mod tests {
230 use crate::segments::{PruneInput, PruneLimiter, ReceiptsByLogs, Segment};
231 use alloy_primitives::B256;
232 use assert_matches::assert_matches;
233 use reth_db_api::{cursor::DbCursorRO, tables, transaction::DbTx};
234 use reth_primitives_traits::InMemorySize;
235 use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader, TransactionsProvider};
236 use reth_prune_types::{PruneMode, PruneSegment, ReceiptsLogPruneConfig};
237 use reth_stages::test_utils::{StorageKind, TestStageDB};
238 use reth_testing_utils::generators::{
239 self, random_block_range, random_eoa_account, random_log, random_receipt, BlockRangeParams,
240 };
241 use std::collections::BTreeMap;
242
243 #[test]
244 fn prune_receipts_by_logs() {
245 reth_tracing::init_test_tracing();
246
247 let db = TestStageDB::default();
248 let mut rng = generators::rng();
249
250 let tip = 20000;
251 let blocks = [
252 random_block_range(
253 &mut rng,
254 0..=100,
255 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 1..5, ..Default::default() },
256 ),
257 random_block_range(
258 &mut rng,
259 (100 + 1)..=(tip - 100),
260 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
261 ),
262 random_block_range(
263 &mut rng,
264 (tip - 100 + 1)..=tip,
265 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 1..5, ..Default::default() },
266 ),
267 ]
268 .concat();
269 db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
270
271 let mut receipts = Vec::new();
272
273 let (deposit_contract_addr, _) = random_eoa_account(&mut rng);
274 for block in &blocks {
275 receipts.reserve_exact(block.body().size());
276 for (txi, transaction) in block.body().transactions.iter().enumerate() {
277 let mut receipt = random_receipt(&mut rng, transaction, Some(1));
278 receipt.logs.push(random_log(
279 &mut rng,
280 (txi == (block.transaction_count() - 1)).then_some(deposit_contract_addr),
281 Some(1),
282 ));
283 receipts.push((receipts.len() as u64, receipt));
284 }
285 }
286 db.insert_receipts(receipts).expect("insert receipts");
287
288 assert_eq!(
289 db.table::<tables::Transactions>().unwrap().len(),
290 blocks.iter().map(|block| block.transaction_count()).sum::<usize>()
291 );
292 assert_eq!(
293 db.table::<tables::Transactions>().unwrap().len(),
294 db.table::<tables::Receipts>().unwrap().len()
295 );
296
297 let run_prune = || {
298 let provider = db.factory.database_provider_rw().unwrap();
299
300 let prune_before_block: usize = 20;
301 let prune_mode = PruneMode::Before(prune_before_block as u64);
302 let receipts_log_filter =
303 ReceiptsLogPruneConfig(BTreeMap::from([(deposit_contract_addr, prune_mode)]));
304
305 let limiter = PruneLimiter::default().set_deleted_entries_limit(10);
306
307 let result = ReceiptsByLogs::new(receipts_log_filter).prune(
308 &provider,
309 PruneInput {
310 previous_checkpoint: db
311 .factory
312 .provider()
313 .unwrap()
314 .get_prune_checkpoint(PruneSegment::ContractLogs)
315 .unwrap(),
316 to_block: tip,
317 limiter,
318 },
319 );
320 provider.commit().expect("commit");
321
322 assert_matches!(result, Ok(_));
323 let output = result.unwrap();
324
325 let (pruned_block, pruned_tx) = db
326 .factory
327 .provider()
328 .unwrap()
329 .get_prune_checkpoint(PruneSegment::ContractLogs)
330 .unwrap()
331 .map(|checkpoint| (checkpoint.block_number.unwrap(), checkpoint.tx_number.unwrap()))
332 .unwrap_or_default();
333
334 let unprunable = pruned_block.saturating_sub(prune_before_block as u64 - 1);
336
337 assert_eq!(
338 db.table::<tables::Receipts>().unwrap().len(),
339 blocks.iter().map(|block| block.transaction_count()).sum::<usize>() -
340 ((pruned_tx + 1) - unprunable) as usize
341 );
342
343 output.progress.is_finished()
344 };
345
346 while !run_prune() {}
347
348 let provider = db.factory.provider().unwrap();
349 let mut cursor = provider.tx_ref().cursor_read::<tables::Receipts>().unwrap();
350 let walker = cursor.walk(None).unwrap();
351 for receipt in walker {
352 let (tx_num, receipt) = receipt.unwrap();
353
354 assert!(
357 receipt.logs.iter().any(|l| l.address == deposit_contract_addr) ||
358 provider.transaction_block(tx_num).unwrap().unwrap() > tip - 128,
359 );
360 }
361 }
362}