1use crate::{
2 db_ext::DbTxPruneExt,
3 segments::{PruneInput, Segment, SegmentOutput},
4 PrunerError,
5};
6use alloy_consensus::transaction::TxHashRef;
7use rayon::prelude::*;
8use reth_db_api::{tables, transaction::DbTxMut};
9use reth_primitives_traits::SignedTransaction;
10use reth_provider::{
11 BlockReader, DBProvider, PruneCheckpointReader, RocksDBProviderFactory,
12 StaticFileProviderFactory,
13};
14use reth_prune_types::{
15 PruneCheckpoint, PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutputCheckpoint,
16};
17use reth_static_file_types::StaticFileSegment;
18use reth_storage_api::StorageSettingsCache;
19use tracing::{debug, instrument, trace};
20
21#[derive(Debug)]
22pub struct TransactionLookup {
23 mode: PruneMode,
24}
25
26impl TransactionLookup {
27 pub const fn new(mode: PruneMode) -> Self {
28 Self { mode }
29 }
30}
31
32impl<Provider> Segment<Provider> for TransactionLookup
33where
34 Provider: DBProvider<Tx: DbTxMut>
35 + BlockReader<Transaction: SignedTransaction>
36 + PruneCheckpointReader
37 + StaticFileProviderFactory
38 + StorageSettingsCache
39 + RocksDBProviderFactory,
40{
41 fn segment(&self) -> PruneSegment {
42 PruneSegment::TransactionLookup
43 }
44
45 fn mode(&self) -> Option<PruneMode> {
46 Some(self.mode)
47 }
48
49 fn purpose(&self) -> PrunePurpose {
50 PrunePurpose::User
51 }
52
53 #[instrument(
54 name = "TransactionLookup::prune",
55 target = "pruner",
56 skip(self, provider),
57 ret(level = "trace")
58 )]
59 fn prune(
60 &self,
61 provider: &Provider,
62 mut input: PruneInput,
63 ) -> Result<SegmentOutput, PrunerError> {
64 if let Some(lowest_range) =
69 provider.static_file_provider().get_lowest_range(StaticFileSegment::Transactions) &&
70 input
71 .previous_checkpoint
72 .is_none_or(|checkpoint| checkpoint.block_number < Some(lowest_range.start()))
73 {
74 let new_checkpoint = lowest_range.start().saturating_sub(1);
75 if let Some(body_indices) = provider.block_body_indices(new_checkpoint)? {
76 input.previous_checkpoint = Some(PruneCheckpoint {
77 block_number: Some(new_checkpoint),
78 tx_number: Some(body_indices.last_tx_num()),
79 prune_mode: self.mode,
80 });
81 debug!(
82 target: "pruner",
83 static_file_checkpoint = ?input.previous_checkpoint,
84 "Using static file transaction checkpoint as TransactionLookup starting point"
85 );
86 }
87 }
88
89 let (start, end) = match input.get_next_tx_num_range(provider)? {
90 Some(range) => range,
91 None => {
92 trace!(target: "pruner", "No transaction lookup entries to prune");
93 return Ok(SegmentOutput::done())
94 }
95 }
96 .into_inner();
97
98 if provider.cached_storage_settings().storage_v2 {
100 return self.prune_rocksdb(provider, input, start, end);
101 }
102
103 if self.mode.is_full() {
105 let pruned = provider.tx_ref().clear_table::<tables::TransactionHashNumbers>()?;
106 trace!(target: "pruner", %pruned, "Cleared transaction lookup table");
107
108 let last_pruned_block = provider
109 .block_by_transaction_id(end)?
110 .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?;
111
112 return Ok(SegmentOutput {
113 progress: PruneProgress::Finished,
114 pruned,
115 checkpoint: Some(SegmentOutputCheckpoint {
116 block_number: Some(last_pruned_block),
117 tx_number: Some(end),
118 }),
119 });
120 }
121
122 let tx_range = start..=
123 Some(end)
124 .min(
125 input
126 .limiter
127 .deleted_entries_limit_left()
128 .map(|left| start.saturating_add(left as u64) - 1),
131 )
132 .unwrap();
133 let tx_range_end = *tx_range.end();
134
135 let mut hashes = provider
137 .transactions_by_tx_range(tx_range.clone())?
138 .into_par_iter()
139 .map(|transaction| *transaction.tx_hash())
140 .collect::<Vec<_>>();
141
142 hashes.sort_unstable();
146
147 let tx_count = tx_range.count();
149 if hashes.len() != tx_count {
150 return Err(PrunerError::InconsistentData(
151 "Unexpected number of transaction hashes retrieved by transaction number range",
152 ))
153 }
154
155 let mut limiter = input.limiter;
156
157 let mut last_pruned_transaction = None;
158 let (pruned, done) =
159 provider.tx_ref().prune_table_with_iterator::<tables::TransactionHashNumbers>(
160 hashes,
161 &mut limiter,
162 |row| {
163 last_pruned_transaction =
164 Some(last_pruned_transaction.unwrap_or(row.1).max(row.1))
165 },
166 )?;
167
168 let done = done && tx_range_end == end;
169 trace!(target: "pruner", %pruned, %done, "Pruned transaction lookup");
170
171 let last_pruned_transaction = last_pruned_transaction.unwrap_or(tx_range_end);
172
173 let last_pruned_block = provider
174 .block_by_transaction_id(last_pruned_transaction)?
175 .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
176 .checked_sub(if done { 0 } else { 1 });
180
181 let progress = limiter.progress(done);
182
183 Ok(SegmentOutput {
184 progress,
185 pruned,
186 checkpoint: Some(SegmentOutputCheckpoint {
187 block_number: last_pruned_block,
188 tx_number: Some(last_pruned_transaction),
189 }),
190 })
191 }
192}
193
194impl TransactionLookup {
195 fn prune_rocksdb<Provider>(
200 &self,
201 provider: &Provider,
202 input: PruneInput,
203 start: alloy_primitives::TxNumber,
204 end: alloy_primitives::TxNumber,
205 ) -> Result<SegmentOutput, PrunerError>
206 where
207 Provider: DBProvider
208 + BlockReader<Transaction: SignedTransaction>
209 + StaticFileProviderFactory
210 + RocksDBProviderFactory,
211 {
212 if self.mode.is_full() {
214 let rocksdb = provider.rocksdb_provider();
215 rocksdb.clear::<tables::TransactionHashNumbers>()?;
216 trace!(target: "pruner", "Cleared transaction lookup table (RocksDB)");
217
218 let last_pruned_block = provider
219 .block_by_transaction_id(end)?
220 .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?;
221
222 return Ok(SegmentOutput {
223 progress: PruneProgress::Finished,
224 pruned: 0, checkpoint: Some(SegmentOutputCheckpoint {
226 block_number: Some(last_pruned_block),
227 tx_number: Some(end),
228 }),
229 });
230 }
231
232 let tx_range_end = input
233 .limiter
234 .deleted_entries_limit_left()
235 .map(|left| start.saturating_add(left as u64).saturating_sub(1))
236 .map_or(end, |limited| limited.min(end));
237 let tx_range = start..=tx_range_end;
238
239 let hashes: Vec<_> = provider
241 .transactions_by_tx_range(tx_range.clone())?
242 .into_par_iter()
243 .map(|transaction| *transaction.tx_hash())
244 .collect();
245
246 let tx_count = tx_range.count();
248 if hashes.len() != tx_count {
249 return Err(PrunerError::InconsistentData(
250 "Unexpected number of transaction hashes retrieved by transaction number range",
251 ))
252 }
253
254 let mut limiter = input.limiter;
255
256 let mut deleted = 0usize;
258 provider.with_rocksdb_batch(|mut batch| {
259 for hash in &hashes {
260 if limiter.is_limit_reached() {
261 break;
262 }
263 batch.delete::<tables::TransactionHashNumbers>(*hash)?;
264 limiter.increment_deleted_entries_count();
265 deleted += 1;
266 }
267 Ok(((), Some(batch.into_inner())))
268 })?;
269
270 let done = deleted == hashes.len() && tx_range_end == end;
271 trace!(target: "pruner", %deleted, %done, "Pruned transaction lookup (RocksDB)");
272
273 let last_pruned_transaction =
274 if deleted > 0 { start + deleted as u64 - 1 } else { tx_range_end };
275
276 let last_pruned_block = provider
277 .block_by_transaction_id(last_pruned_transaction)?
278 .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
279 .checked_sub(if done { 0 } else { 1 });
280
281 let progress = limiter.progress(done);
282
283 Ok(SegmentOutput {
284 progress,
285 pruned: deleted,
286 checkpoint: Some(SegmentOutputCheckpoint {
287 block_number: last_pruned_block,
288 tx_number: Some(last_pruned_transaction),
289 }),
290 })
291 }
292}
293
294#[cfg(test)]
295mod tests {
296 use crate::segments::{PruneInput, PruneLimiter, Segment, SegmentOutput, TransactionLookup};
297 use alloy_primitives::{BlockNumber, TxNumber, B256};
298 use assert_matches::assert_matches;
299 use itertools::{
300 FoldWhile::{Continue, Done},
301 Itertools,
302 };
303 use reth_db_api::tables;
304 use reth_provider::{DBProvider, DatabaseProviderFactory, PruneCheckpointReader};
305 use reth_prune_types::{
306 PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
307 };
308 use reth_stages::test_utils::{StorageKind, TestStageDB};
309 use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
310 use std::ops::Sub;
311
312 #[test]
313 fn prune() {
314 let db = TestStageDB::default();
315 let mut rng = generators::rng();
316
317 let blocks = random_block_range(
318 &mut rng,
319 1..=10,
320 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
321 );
322 db.insert_blocks(blocks.iter(), StorageKind::Static).expect("insert blocks");
323
324 let mut tx_hash_numbers = Vec::new();
325 for block in &blocks {
326 tx_hash_numbers.reserve_exact(block.transaction_count());
327 for transaction in &block.body().transactions {
328 tx_hash_numbers.push((*transaction.tx_hash(), tx_hash_numbers.len() as u64));
329 }
330 }
331 let tx_hash_numbers_len = tx_hash_numbers.len();
332 db.insert_tx_hash_numbers(tx_hash_numbers).expect("insert tx hash numbers");
333
334 assert_eq!(
335 db.count_entries::<tables::Transactions>().unwrap(),
336 blocks.iter().map(|block| block.transaction_count()).sum::<usize>()
337 );
338 assert_eq!(
339 db.count_entries::<tables::Transactions>().unwrap(),
340 db.table::<tables::TransactionHashNumbers>().unwrap().len()
341 );
342
343 let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| {
344 let prune_mode = PruneMode::Before(to_block);
345 let segment = TransactionLookup::new(prune_mode);
346 let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10);
347 let input = PruneInput {
348 previous_checkpoint: db
349 .factory
350 .provider()
351 .unwrap()
352 .get_prune_checkpoint(PruneSegment::TransactionLookup)
353 .unwrap(),
354 to_block,
355 limiter: limiter.clone(),
356 };
357
358 let next_tx_number_to_prune = db
359 .factory
360 .provider()
361 .unwrap()
362 .get_prune_checkpoint(PruneSegment::TransactionLookup)
363 .unwrap()
364 .and_then(|checkpoint| checkpoint.tx_number)
365 .map(|tx_number| tx_number + 1)
366 .unwrap_or_default();
367
368 let last_pruned_tx_number = blocks
369 .iter()
370 .take(to_block as usize)
371 .map(|block| block.transaction_count())
372 .sum::<usize>()
373 .min(
374 next_tx_number_to_prune as usize +
375 input.limiter.deleted_entries_limit().unwrap(),
376 )
377 .sub(1);
378
379 let last_pruned_block_number = blocks
380 .iter()
381 .fold_while((0, 0), |(_, mut tx_count), block| {
382 tx_count += block.transaction_count();
383
384 if tx_count > last_pruned_tx_number {
385 Done((block.number, tx_count))
386 } else {
387 Continue((block.number, tx_count))
388 }
389 })
390 .into_inner()
391 .0;
392
393 let provider = db.factory.database_provider_rw().unwrap();
394 let result = segment.prune(&provider, input).unwrap();
395 limiter.increment_deleted_entries_count_by(result.pruned);
396
397 assert_matches!(
398 result,
399 SegmentOutput {progress, pruned, checkpoint: Some(_)}
400 if (progress, pruned) == expected_result
401 );
402
403 segment
404 .save_checkpoint(
405 &provider,
406 result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
407 )
408 .unwrap();
409 provider.commit().expect("commit");
410
411 let last_pruned_block_number = last_pruned_block_number
412 .checked_sub(if result.progress.is_finished() { 0 } else { 1 });
413
414 assert_eq!(
415 db.table::<tables::TransactionHashNumbers>().unwrap().len(),
416 tx_hash_numbers_len - (last_pruned_tx_number + 1)
417 );
418 assert_eq!(
419 db.factory
420 .provider()
421 .unwrap()
422 .get_prune_checkpoint(PruneSegment::TransactionLookup)
423 .unwrap(),
424 Some(PruneCheckpoint {
425 block_number: last_pruned_block_number,
426 tx_number: Some(last_pruned_tx_number as TxNumber),
427 prune_mode
428 })
429 );
430 };
431
432 test_prune(
433 6,
434 (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 10),
435 );
436 test_prune(6, (PruneProgress::Finished, 2));
437 test_prune(10, (PruneProgress::Finished, 8));
438 }
439
440 #[test]
441 fn prune_rocksdb() {
442 use reth_db_api::models::StorageSettings;
443 use reth_provider::RocksDBProviderFactory;
444 use reth_storage_api::StorageSettingsCache;
445
446 let db = TestStageDB::default();
447 let mut rng = generators::rng();
448
449 let blocks = random_block_range(
450 &mut rng,
451 1..=10,
452 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
453 );
454 db.insert_blocks(blocks.iter(), StorageKind::Static).expect("insert blocks");
455
456 let mut tx_hash_numbers = Vec::new();
458 for block in &blocks {
459 tx_hash_numbers.reserve_exact(block.transaction_count());
460 for transaction in &block.body().transactions {
461 tx_hash_numbers.push((*transaction.tx_hash(), tx_hash_numbers.len() as u64));
462 }
463 }
464 let tx_hash_numbers_len = tx_hash_numbers.len();
465
466 {
468 let rocksdb = db.factory.rocksdb_provider();
469 let mut batch = rocksdb.batch();
470 for (hash, tx_num) in &tx_hash_numbers {
471 batch.put::<tables::TransactionHashNumbers>(*hash, tx_num).unwrap();
472 }
473 batch.commit().expect("commit rocksdb batch");
474 }
475
476 {
478 let rocksdb = db.factory.rocksdb_provider();
479 for (hash, expected_tx_num) in &tx_hash_numbers {
480 let actual = rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap();
481 assert_eq!(actual, Some(*expected_tx_num));
482 }
483 }
484
485 let to_block: BlockNumber = 6;
486 let prune_mode = PruneMode::Before(to_block);
487 let input =
488 PruneInput { previous_checkpoint: None, to_block, limiter: PruneLimiter::default() };
489 let segment = TransactionLookup::new(prune_mode);
490
491 db.factory.set_storage_settings_cache(StorageSettings::v2());
493
494 let provider = db.factory.database_provider_rw().unwrap();
495 let result = segment.prune(&provider, input).unwrap();
496 provider.commit().expect("commit");
497
498 assert_matches!(
499 result,
500 SegmentOutput { progress: PruneProgress::Finished, pruned, checkpoint: Some(_) }
501 if pruned > 0
502 );
503
504 let txs_up_to_block_6: usize = blocks.iter().take(6).map(|b| b.transaction_count()).sum();
506
507 {
509 let rocksdb = db.factory.rocksdb_provider();
510 for (i, (hash, _)) in tx_hash_numbers.iter().enumerate() {
511 let entry = rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap();
512 if i < txs_up_to_block_6 {
513 assert!(entry.is_none(), "Entry {} (hash {:?}) should be pruned", i, hash);
514 } else {
515 assert!(entry.is_some(), "Entry {} (hash {:?}) should still exist", i, hash);
516 }
517 }
518 }
519
520 {
522 let rocksdb = db.factory.rocksdb_provider();
523 let remaining: Vec<_> =
524 rocksdb.iter::<tables::TransactionHashNumbers>().unwrap().collect();
525 assert_eq!(
526 remaining.len(),
527 tx_hash_numbers_len - txs_up_to_block_6,
528 "Remaining RocksDB entries should match expected"
529 );
530 }
531 }
532
533 #[test]
541 fn prune_rocksdb_zero_deleted_checkpoint() {
542 use reth_db_api::models::StorageSettings;
543 use reth_provider::RocksDBProviderFactory;
544 use reth_storage_api::StorageSettingsCache;
545
546 let db = TestStageDB::default();
547 let mut rng = generators::rng();
548
549 let blocks = random_block_range(
550 &mut rng,
551 1..=10,
552 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
553 );
554 db.insert_blocks(blocks.iter(), StorageKind::Static).expect("insert blocks");
555
556 let mut tx_hash_numbers = Vec::new();
558 for block in &blocks {
559 tx_hash_numbers.reserve_exact(block.transaction_count());
560 for transaction in &block.body().transactions {
561 tx_hash_numbers.push((*transaction.tx_hash(), tx_hash_numbers.len() as u64));
562 }
563 }
564
565 {
567 let rocksdb = db.factory.rocksdb_provider();
568 let mut batch = rocksdb.batch();
569 for (hash, tx_num) in &tx_hash_numbers {
570 batch.put::<tables::TransactionHashNumbers>(*hash, tx_num).unwrap();
571 }
572 batch.commit().expect("commit rocksdb batch");
573 }
574
575 db.factory.set_storage_settings_cache(StorageSettings::v2());
577
578 let to_block: BlockNumber = 6;
579 let prune_mode = PruneMode::Before(to_block);
580
581 let previous_checkpoint =
583 Some(PruneCheckpoint { block_number: Some(2), tx_number: Some(5), prune_mode });
584
585 let mut limiter = PruneLimiter::default().set_deleted_entries_limit(1);
588 limiter.increment_deleted_entries_count(); let input = PruneInput { previous_checkpoint, to_block, limiter };
591 let segment = TransactionLookup::new(prune_mode);
592
593 let provider = db.factory.database_provider_rw().unwrap();
594 let result = segment.prune(&provider, input).unwrap();
595 provider.commit().expect("commit");
596
597 assert_eq!(result.pruned, 0, "Nothing should be pruned with exhausted limit");
599
600 if let Some(checkpoint) = &result.checkpoint {
604 assert_eq!(
605 checkpoint.tx_number,
606 Some(5),
607 "Checkpoint should stay at 5 (previous), not advance to 6 (start)"
608 );
609 }
610
611 {
613 let rocksdb = db.factory.rocksdb_provider();
614 let remaining: Vec<_> =
615 rocksdb.iter::<tables::TransactionHashNumbers>().unwrap().collect();
616 assert_eq!(
617 remaining.len(),
618 tx_hash_numbers.len(),
619 "All RocksDB entries should still exist"
620 );
621 }
622 }
623}