reth_prune/segments/user/
receipts_by_logs.rs1use crate::{
2 db_ext::DbTxPruneExt,
3 segments::{PruneInput, Segment},
4 PrunerError,
5};
6use alloy_consensus::TxReceipt;
7use reth_db_api::{table::Value, tables, transaction::DbTxMut};
8use reth_primitives_traits::NodePrimitives;
9use reth_provider::{
10 BlockReader, DBProvider, NodePrimitivesProvider, PruneCheckpointWriter, TransactionsProvider,
11};
12use reth_prune_types::{
13 PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment, ReceiptsLogPruneConfig, SegmentOutput,
14 MINIMUM_UNWIND_SAFE_DISTANCE,
15};
16use tracing::{instrument, trace};
17#[derive(Debug)]
18pub struct ReceiptsByLogs {
19 config: ReceiptsLogPruneConfig,
20}
21
22impl ReceiptsByLogs {
23 pub const fn new(config: ReceiptsLogPruneConfig) -> Self {
24 Self { config }
25 }
26}
27
28impl<Provider> Segment<Provider> for ReceiptsByLogs
29where
30 Provider: DBProvider<Tx: DbTxMut>
31 + PruneCheckpointWriter
32 + TransactionsProvider
33 + BlockReader
34 + NodePrimitivesProvider<Primitives: NodePrimitives<Receipt: Value>>,
35{
36 fn segment(&self) -> PruneSegment {
37 PruneSegment::ContractLogs
38 }
39
40 fn mode(&self) -> Option<PruneMode> {
41 None
42 }
43
44 fn purpose(&self) -> PrunePurpose {
45 PrunePurpose::User
46 }
47
48 #[instrument(
49 name = "ReceiptsByLogs::prune",
50 target = "pruner",
51 skip(self, provider),
52 ret(level = "trace")
53 )]
54 fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
55 let to_block = PruneMode::Distance(MINIMUM_UNWIND_SAFE_DISTANCE)
59 .prune_target_block(input.to_block, PruneSegment::ContractLogs, PrunePurpose::User)?
60 .map(|(bn, _)| bn)
61 .unwrap_or_default();
62
63 let mut last_pruned_block =
65 input.previous_checkpoint.and_then(|checkpoint| checkpoint.block_number);
66
67 let initial_last_pruned_block = last_pruned_block;
68
69 let mut from_tx_number = match initial_last_pruned_block {
70 Some(block) => provider
71 .block_body_indices(block)?
72 .map(|block| block.last_tx_num() + 1)
73 .unwrap_or(0),
74 None => 0,
75 };
76
77 let address_filter = self.config.group_by_block(input.to_block, last_pruned_block)?;
80
81 let mut block_ranges = vec![];
102 let mut blocks_iter = address_filter.iter().peekable();
103 let mut filtered_addresses = vec![];
104
105 while let Some((start_block, addresses)) = blocks_iter.next() {
106 filtered_addresses.extend_from_slice(addresses);
107
108 if block_ranges.is_empty() {
111 let init = last_pruned_block.map(|b| b + 1).unwrap_or_default();
112 if init < *start_block {
113 block_ranges.push((init, *start_block - 1, 0));
114 }
115 }
116
117 let end_block =
118 blocks_iter.peek().map(|(next_block, _)| *next_block - 1).unwrap_or(to_block);
119
120 block_ranges.push((*start_block, end_block, filtered_addresses.len()));
123 }
124
125 trace!(
126 target: "pruner",
127 ?block_ranges,
128 ?filtered_addresses,
129 "Calculated block ranges and filtered addresses",
130 );
131
132 let mut limiter = input.limiter;
133
134 let mut done = true;
135 let mut pruned = 0;
136 let mut last_pruned_transaction = None;
137 for (start_block, end_block, num_addresses) in block_ranges {
138 let block_range = start_block..=end_block;
139
140 let tx_range_end = match provider.block_body_indices(end_block)? {
142 Some(body) => body.last_tx_num(),
143 None => {
144 trace!(
145 target: "pruner",
146 ?block_range,
147 "No receipts to prune."
148 );
149 continue
150 }
151 };
152 let tx_range = from_tx_number..=tx_range_end;
153
154 let mut last_skipped_transaction = 0;
156 let deleted;
157 (deleted, done) = provider.tx_ref().prune_table_with_range::<tables::Receipts<
158 <Provider::Primitives as NodePrimitives>::Receipt,
159 >>(
160 tx_range,
161 &mut limiter,
162 |(tx_num, receipt)| {
163 let skip = num_addresses > 0 &&
164 receipt.logs().iter().any(|log| {
165 filtered_addresses[..num_addresses].contains(&&log.address)
166 });
167
168 if skip {
169 last_skipped_transaction = *tx_num;
170 }
171 skip
172 },
173 |row| last_pruned_transaction = Some(row.0),
174 )?;
175
176 trace!(target: "pruner", %deleted, %done, ?block_range, "Pruned receipts");
177
178 pruned += deleted;
179
180 let last_pruned_transaction = *last_pruned_transaction
184 .insert(last_pruned_transaction.unwrap_or_default().max(last_skipped_transaction));
185
186 last_pruned_block = Some(
187 provider
188 .block_by_transaction_id(last_pruned_transaction)?
189 .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
190 .saturating_sub(if done { 0 } else { 1 }),
194 );
195
196 if limiter.is_limit_reached() {
197 done &= end_block == to_block;
198 break
199 }
200
201 from_tx_number = last_pruned_transaction + 1;
202 }
203
204 let prune_mode_block = self
214 .config
215 .lowest_block_with_distance(input.to_block, initial_last_pruned_block)?
216 .unwrap_or(to_block);
217
218 provider.save_prune_checkpoint(
219 PruneSegment::ContractLogs,
220 PruneCheckpoint {
221 block_number: Some(prune_mode_block.min(last_pruned_block.unwrap_or(u64::MAX))),
222 tx_number: last_pruned_transaction,
223 prune_mode: PruneMode::Before(prune_mode_block),
224 },
225 )?;
226
227 let progress = limiter.progress(done);
228
229 Ok(SegmentOutput { progress, pruned, checkpoint: None })
230 }
231}
232
233#[cfg(test)]
234mod tests {
235 use crate::segments::{user::ReceiptsByLogs, PruneInput, PruneLimiter, Segment};
236 use alloy_primitives::B256;
237 use assert_matches::assert_matches;
238 use reth_db_api::{cursor::DbCursorRO, tables, transaction::DbTx};
239 use reth_primitives_traits::InMemorySize;
240 use reth_provider::{BlockReader, DBProvider, DatabaseProviderFactory, PruneCheckpointReader};
241 use reth_prune_types::{PruneMode, PruneSegment, ReceiptsLogPruneConfig};
242 use reth_stages::test_utils::{StorageKind, TestStageDB};
243 use reth_testing_utils::generators::{
244 self, random_block_range, random_eoa_account, random_log, random_receipt, BlockRangeParams,
245 };
246 use std::collections::BTreeMap;
247
248 #[test]
249 fn prune_receipts_by_logs() {
250 reth_tracing::init_test_tracing();
251
252 let db = TestStageDB::default();
253 let mut rng = generators::rng();
254
255 let tip = 20000;
256 let blocks = [
257 random_block_range(
258 &mut rng,
259 0..=100,
260 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 1..5, ..Default::default() },
261 ),
262 random_block_range(
263 &mut rng,
264 (100 + 1)..=(tip - 100),
265 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
266 ),
267 random_block_range(
268 &mut rng,
269 (tip - 100 + 1)..=tip,
270 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 1..5, ..Default::default() },
271 ),
272 ]
273 .concat();
274 db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
275
276 let mut receipts = Vec::new();
277
278 let (deposit_contract_addr, _) = random_eoa_account(&mut rng);
279 for block in &blocks {
280 receipts.reserve_exact(block.body().size());
281 for (txi, transaction) in block.body().transactions.iter().enumerate() {
282 let mut receipt = random_receipt(&mut rng, transaction, Some(1), None);
283 receipt.logs.push(random_log(
284 &mut rng,
285 (txi == (block.transaction_count() - 1)).then_some(deposit_contract_addr),
286 Some(1),
287 ));
288 receipts.push((receipts.len() as u64, receipt));
289 }
290 }
291 db.insert_receipts(receipts).expect("insert receipts");
292
293 assert_eq!(
294 db.table::<tables::Transactions>().unwrap().len(),
295 blocks.iter().map(|block| block.transaction_count()).sum::<usize>()
296 );
297 assert_eq!(
298 db.table::<tables::Transactions>().unwrap().len(),
299 db.table::<tables::Receipts>().unwrap().len()
300 );
301
302 let run_prune = || {
303 let provider = db.factory.database_provider_rw().unwrap();
304
305 let prune_before_block: usize = 20;
306 let prune_mode = PruneMode::Before(prune_before_block as u64);
307 let receipts_log_filter =
308 ReceiptsLogPruneConfig(BTreeMap::from([(deposit_contract_addr, prune_mode)]));
309
310 let limiter = PruneLimiter::default().set_deleted_entries_limit(10);
311
312 let result = ReceiptsByLogs::new(receipts_log_filter).prune(
313 &provider,
314 PruneInput {
315 previous_checkpoint: db
316 .factory
317 .provider()
318 .unwrap()
319 .get_prune_checkpoint(PruneSegment::ContractLogs)
320 .unwrap(),
321 to_block: tip,
322 limiter,
323 },
324 );
325 provider.commit().expect("commit");
326
327 assert_matches!(result, Ok(_));
328 let output = result.unwrap();
329
330 let (pruned_block, pruned_tx) = db
331 .factory
332 .provider()
333 .unwrap()
334 .get_prune_checkpoint(PruneSegment::ContractLogs)
335 .unwrap()
336 .map(|checkpoint| (checkpoint.block_number.unwrap(), checkpoint.tx_number.unwrap()))
337 .unwrap_or_default();
338
339 let unprunable = pruned_block.saturating_sub(prune_before_block as u64 - 1);
341
342 assert_eq!(
343 db.table::<tables::Receipts>().unwrap().len(),
344 blocks.iter().map(|block| block.transaction_count()).sum::<usize>() -
345 ((pruned_tx + 1) - unprunable) as usize
346 );
347
348 output.progress.is_finished()
349 };
350
351 while !run_prune() {}
352
353 let provider = db.factory.provider().unwrap();
354 let mut cursor = provider.tx_ref().cursor_read::<tables::Receipts>().unwrap();
355 let walker = cursor.walk(None).unwrap();
356 for receipt in walker {
357 let (tx_num, receipt) = receipt.unwrap();
358
359 assert!(
362 receipt.logs.iter().any(|l| l.address == deposit_contract_addr) ||
363 provider.block_by_transaction_id(tx_num).unwrap().unwrap() > tip - 128,
364 );
365 }
366 }
367}