1use crate::{
2 db_ext::DbTxPruneExt,
3 segments::{PruneInput, Segment, SegmentOutput},
4 PrunerError,
5};
6use alloy_eips::eip2718::Encodable2718;
7use rayon::prelude::*;
8use reth_db_api::{tables, transaction::DbTxMut};
9use reth_provider::{
10 BlockReader, DBProvider, PruneCheckpointReader, RocksDBProviderFactory,
11 StaticFileProviderFactory,
12};
13use reth_prune_types::{
14 PruneCheckpoint, PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutputCheckpoint,
15};
16use reth_static_file_types::StaticFileSegment;
17use reth_storage_api::StorageSettingsCache;
18use tracing::{debug, instrument, trace};
19
20#[derive(Debug)]
21pub struct TransactionLookup {
22 mode: PruneMode,
23}
24
25impl TransactionLookup {
26 pub const fn new(mode: PruneMode) -> Self {
27 Self { mode }
28 }
29}
30
31impl<Provider> Segment<Provider> for TransactionLookup
32where
33 Provider: DBProvider<Tx: DbTxMut>
34 + BlockReader<Transaction: Encodable2718>
35 + PruneCheckpointReader
36 + StaticFileProviderFactory
37 + StorageSettingsCache
38 + RocksDBProviderFactory,
39{
40 fn segment(&self) -> PruneSegment {
41 PruneSegment::TransactionLookup
42 }
43
44 fn mode(&self) -> Option<PruneMode> {
45 Some(self.mode)
46 }
47
48 fn purpose(&self) -> PrunePurpose {
49 PrunePurpose::User
50 }
51
52 #[instrument(
53 name = "TransactionLookup::prune",
54 target = "pruner",
55 skip(self, provider),
56 ret(level = "trace")
57 )]
58 fn prune(
59 &self,
60 provider: &Provider,
61 mut input: PruneInput,
62 ) -> Result<SegmentOutput, PrunerError> {
63 if let Some(lowest_range) =
68 provider.static_file_provider().get_lowest_range(StaticFileSegment::Transactions) &&
69 input
70 .previous_checkpoint
71 .is_none_or(|checkpoint| checkpoint.block_number < Some(lowest_range.start()))
72 {
73 let new_checkpoint = lowest_range.start().saturating_sub(1);
74 if let Some(body_indices) = provider.block_body_indices(new_checkpoint)? {
75 input.previous_checkpoint = Some(PruneCheckpoint {
76 block_number: Some(new_checkpoint),
77 tx_number: Some(body_indices.last_tx_num()),
78 prune_mode: self.mode,
79 });
80 debug!(
81 target: "pruner",
82 static_file_checkpoint = ?input.previous_checkpoint,
83 "Using static file transaction checkpoint as TransactionLookup starting point"
84 );
85 }
86 }
87
88 let (start, end) = match input.get_next_tx_num_range(provider)? {
89 Some(range) => range,
90 None => {
91 trace!(target: "pruner", "No transaction lookup entries to prune");
92 return Ok(SegmentOutput::done())
93 }
94 }
95 .into_inner();
96
97 #[cfg(all(unix, feature = "rocksdb"))]
99 if provider.cached_storage_settings().storage_v2 {
100 return self.prune_rocksdb(provider, input, start, end);
101 }
102
103 if self.mode.is_full() {
105 let pruned = provider.tx_ref().clear_table::<tables::TransactionHashNumbers>()?;
106 trace!(target: "pruner", %pruned, "Cleared transaction lookup table");
107
108 let last_pruned_block = provider
109 .block_by_transaction_id(end)?
110 .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?;
111
112 return Ok(SegmentOutput {
113 progress: PruneProgress::Finished,
114 pruned,
115 checkpoint: Some(SegmentOutputCheckpoint {
116 block_number: Some(last_pruned_block),
117 tx_number: Some(end),
118 }),
119 });
120 }
121
122 let tx_range = start..=
123 Some(end)
124 .min(
125 input
126 .limiter
127 .deleted_entries_limit_left()
128 .map(|left| start.saturating_add(left as u64) - 1),
131 )
132 .unwrap();
133 let tx_range_end = *tx_range.end();
134
135 let mut hashes = provider
137 .transactions_by_tx_range(tx_range.clone())?
138 .into_par_iter()
139 .map(|transaction| transaction.trie_hash())
140 .collect::<Vec<_>>();
141
142 hashes.sort_unstable();
146
147 let tx_count = tx_range.count();
149 if hashes.len() != tx_count {
150 return Err(PrunerError::InconsistentData(
151 "Unexpected number of transaction hashes retrieved by transaction number range",
152 ))
153 }
154
155 let mut limiter = input.limiter;
156
157 let mut last_pruned_transaction = None;
158 let (pruned, done) =
159 provider.tx_ref().prune_table_with_iterator::<tables::TransactionHashNumbers>(
160 hashes,
161 &mut limiter,
162 |row| {
163 last_pruned_transaction =
164 Some(last_pruned_transaction.unwrap_or(row.1).max(row.1))
165 },
166 )?;
167
168 let done = done && tx_range_end == end;
169 trace!(target: "pruner", %pruned, %done, "Pruned transaction lookup");
170
171 let last_pruned_transaction = last_pruned_transaction.unwrap_or(tx_range_end);
172
173 let last_pruned_block = provider
174 .block_by_transaction_id(last_pruned_transaction)?
175 .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
176 .checked_sub(if done { 0 } else { 1 });
180
181 let progress = limiter.progress(done);
182
183 Ok(SegmentOutput {
184 progress,
185 pruned,
186 checkpoint: Some(SegmentOutputCheckpoint {
187 block_number: last_pruned_block,
188 tx_number: Some(last_pruned_transaction),
189 }),
190 })
191 }
192}
193
194impl TransactionLookup {
195 #[cfg(all(unix, feature = "rocksdb"))]
200 fn prune_rocksdb<Provider>(
201 &self,
202 provider: &Provider,
203 input: PruneInput,
204 start: alloy_primitives::TxNumber,
205 end: alloy_primitives::TxNumber,
206 ) -> Result<SegmentOutput, PrunerError>
207 where
208 Provider: DBProvider
209 + BlockReader<Transaction: Encodable2718>
210 + StaticFileProviderFactory
211 + RocksDBProviderFactory,
212 {
213 if self.mode.is_full() {
215 let rocksdb = provider.rocksdb_provider();
216 rocksdb.clear::<tables::TransactionHashNumbers>()?;
217 trace!(target: "pruner", "Cleared transaction lookup table (RocksDB)");
218
219 let last_pruned_block = provider
220 .block_by_transaction_id(end)?
221 .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?;
222
223 return Ok(SegmentOutput {
224 progress: PruneProgress::Finished,
225 pruned: 0, checkpoint: Some(SegmentOutputCheckpoint {
227 block_number: Some(last_pruned_block),
228 tx_number: Some(end),
229 }),
230 });
231 }
232
233 let tx_range_end = input
234 .limiter
235 .deleted_entries_limit_left()
236 .map(|left| start.saturating_add(left as u64).saturating_sub(1))
237 .map_or(end, |limited| limited.min(end));
238 let tx_range = start..=tx_range_end;
239
240 let hashes: Vec<_> = provider
242 .transactions_by_tx_range(tx_range.clone())?
243 .into_par_iter()
244 .map(|transaction| transaction.trie_hash())
245 .collect();
246
247 let tx_count = tx_range.count();
249 if hashes.len() != tx_count {
250 return Err(PrunerError::InconsistentData(
251 "Unexpected number of transaction hashes retrieved by transaction number range",
252 ))
253 }
254
255 let mut limiter = input.limiter;
256
257 let mut deleted = 0usize;
259 provider.with_rocksdb_batch(|mut batch| {
260 for hash in &hashes {
261 if limiter.is_limit_reached() {
262 break;
263 }
264 batch.delete::<tables::TransactionHashNumbers>(*hash)?;
265 limiter.increment_deleted_entries_count();
266 deleted += 1;
267 }
268 Ok(((), Some(batch.into_inner())))
269 })?;
270
271 let done = deleted == hashes.len() && tx_range_end == end;
272 trace!(target: "pruner", %deleted, %done, "Pruned transaction lookup (RocksDB)");
273
274 let last_pruned_transaction =
275 if deleted > 0 { start + deleted as u64 - 1 } else { tx_range_end };
276
277 let last_pruned_block = provider
278 .block_by_transaction_id(last_pruned_transaction)?
279 .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
280 .checked_sub(if done { 0 } else { 1 });
281
282 let progress = limiter.progress(done);
283
284 Ok(SegmentOutput {
285 progress,
286 pruned: deleted,
287 checkpoint: Some(SegmentOutputCheckpoint {
288 block_number: last_pruned_block,
289 tx_number: Some(last_pruned_transaction),
290 }),
291 })
292 }
293}
294
295#[cfg(test)]
296mod tests {
297 use crate::segments::{PruneInput, PruneLimiter, Segment, SegmentOutput, TransactionLookup};
298 use alloy_primitives::{BlockNumber, TxNumber, B256};
299 use assert_matches::assert_matches;
300 use itertools::{
301 FoldWhile::{Continue, Done},
302 Itertools,
303 };
304 use reth_db_api::tables;
305 use reth_provider::{DBProvider, DatabaseProviderFactory, PruneCheckpointReader};
306 use reth_prune_types::{
307 PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
308 };
309 use reth_stages::test_utils::{StorageKind, TestStageDB};
310 use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
311 use std::ops::Sub;
312
313 #[test]
314 fn prune() {
315 let db = TestStageDB::default();
316 let mut rng = generators::rng();
317
318 let blocks = random_block_range(
319 &mut rng,
320 1..=10,
321 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
322 );
323 db.insert_blocks(blocks.iter(), StorageKind::Static).expect("insert blocks");
324
325 let mut tx_hash_numbers = Vec::new();
326 for block in &blocks {
327 tx_hash_numbers.reserve_exact(block.transaction_count());
328 for transaction in &block.body().transactions {
329 tx_hash_numbers.push((*transaction.tx_hash(), tx_hash_numbers.len() as u64));
330 }
331 }
332 let tx_hash_numbers_len = tx_hash_numbers.len();
333 db.insert_tx_hash_numbers(tx_hash_numbers).expect("insert tx hash numbers");
334
335 assert_eq!(
336 db.count_entries::<tables::Transactions>().unwrap(),
337 blocks.iter().map(|block| block.transaction_count()).sum::<usize>()
338 );
339 assert_eq!(
340 db.count_entries::<tables::Transactions>().unwrap(),
341 db.table::<tables::TransactionHashNumbers>().unwrap().len()
342 );
343
344 let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| {
345 let prune_mode = PruneMode::Before(to_block);
346 let segment = TransactionLookup::new(prune_mode);
347 let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10);
348 let input = PruneInput {
349 previous_checkpoint: db
350 .factory
351 .provider()
352 .unwrap()
353 .get_prune_checkpoint(PruneSegment::TransactionLookup)
354 .unwrap(),
355 to_block,
356 limiter: limiter.clone(),
357 };
358
359 let next_tx_number_to_prune = db
360 .factory
361 .provider()
362 .unwrap()
363 .get_prune_checkpoint(PruneSegment::TransactionLookup)
364 .unwrap()
365 .and_then(|checkpoint| checkpoint.tx_number)
366 .map(|tx_number| tx_number + 1)
367 .unwrap_or_default();
368
369 let last_pruned_tx_number = blocks
370 .iter()
371 .take(to_block as usize)
372 .map(|block| block.transaction_count())
373 .sum::<usize>()
374 .min(
375 next_tx_number_to_prune as usize +
376 input.limiter.deleted_entries_limit().unwrap(),
377 )
378 .sub(1);
379
380 let last_pruned_block_number = blocks
381 .iter()
382 .fold_while((0, 0), |(_, mut tx_count), block| {
383 tx_count += block.transaction_count();
384
385 if tx_count > last_pruned_tx_number {
386 Done((block.number, tx_count))
387 } else {
388 Continue((block.number, tx_count))
389 }
390 })
391 .into_inner()
392 .0;
393
394 let provider = db.factory.database_provider_rw().unwrap();
395 let result = segment.prune(&provider, input).unwrap();
396 limiter.increment_deleted_entries_count_by(result.pruned);
397
398 assert_matches!(
399 result,
400 SegmentOutput {progress, pruned, checkpoint: Some(_)}
401 if (progress, pruned) == expected_result
402 );
403
404 segment
405 .save_checkpoint(
406 &provider,
407 result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
408 )
409 .unwrap();
410 provider.commit().expect("commit");
411
412 let last_pruned_block_number = last_pruned_block_number
413 .checked_sub(if result.progress.is_finished() { 0 } else { 1 });
414
415 assert_eq!(
416 db.table::<tables::TransactionHashNumbers>().unwrap().len(),
417 tx_hash_numbers_len - (last_pruned_tx_number + 1)
418 );
419 assert_eq!(
420 db.factory
421 .provider()
422 .unwrap()
423 .get_prune_checkpoint(PruneSegment::TransactionLookup)
424 .unwrap(),
425 Some(PruneCheckpoint {
426 block_number: last_pruned_block_number,
427 tx_number: Some(last_pruned_tx_number as TxNumber),
428 prune_mode
429 })
430 );
431 };
432
433 test_prune(
434 6,
435 (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 10),
436 );
437 test_prune(6, (PruneProgress::Finished, 2));
438 test_prune(10, (PruneProgress::Finished, 8));
439 }
440
441 #[cfg(all(unix, feature = "rocksdb"))]
442 #[test]
443 fn prune_rocksdb() {
444 use reth_db_api::models::StorageSettings;
445 use reth_provider::RocksDBProviderFactory;
446 use reth_storage_api::StorageSettingsCache;
447
448 let db = TestStageDB::default();
449 let mut rng = generators::rng();
450
451 let blocks = random_block_range(
452 &mut rng,
453 1..=10,
454 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
455 );
456 db.insert_blocks(blocks.iter(), StorageKind::Static).expect("insert blocks");
457
458 let mut tx_hash_numbers = Vec::new();
460 for block in &blocks {
461 tx_hash_numbers.reserve_exact(block.transaction_count());
462 for transaction in &block.body().transactions {
463 tx_hash_numbers.push((*transaction.tx_hash(), tx_hash_numbers.len() as u64));
464 }
465 }
466 let tx_hash_numbers_len = tx_hash_numbers.len();
467
468 {
470 let rocksdb = db.factory.rocksdb_provider();
471 let mut batch = rocksdb.batch();
472 for (hash, tx_num) in &tx_hash_numbers {
473 batch.put::<tables::TransactionHashNumbers>(*hash, tx_num).unwrap();
474 }
475 batch.commit().expect("commit rocksdb batch");
476 }
477
478 {
480 let rocksdb = db.factory.rocksdb_provider();
481 for (hash, expected_tx_num) in &tx_hash_numbers {
482 let actual = rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap();
483 assert_eq!(actual, Some(*expected_tx_num));
484 }
485 }
486
487 let to_block: BlockNumber = 6;
488 let prune_mode = PruneMode::Before(to_block);
489 let input =
490 PruneInput { previous_checkpoint: None, to_block, limiter: PruneLimiter::default() };
491 let segment = TransactionLookup::new(prune_mode);
492
493 db.factory.set_storage_settings_cache(StorageSettings::v2());
495
496 let provider = db.factory.database_provider_rw().unwrap();
497 let result = segment.prune(&provider, input).unwrap();
498 provider.commit().expect("commit");
499
500 assert_matches!(
501 result,
502 SegmentOutput { progress: PruneProgress::Finished, pruned, checkpoint: Some(_) }
503 if pruned > 0
504 );
505
506 let txs_up_to_block_6: usize = blocks.iter().take(6).map(|b| b.transaction_count()).sum();
508
509 {
511 let rocksdb = db.factory.rocksdb_provider();
512 for (i, (hash, _)) in tx_hash_numbers.iter().enumerate() {
513 let entry = rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap();
514 if i < txs_up_to_block_6 {
515 assert!(entry.is_none(), "Entry {} (hash {:?}) should be pruned", i, hash);
516 } else {
517 assert!(entry.is_some(), "Entry {} (hash {:?}) should still exist", i, hash);
518 }
519 }
520 }
521
522 {
524 let rocksdb = db.factory.rocksdb_provider();
525 let remaining: Vec<_> =
526 rocksdb.iter::<tables::TransactionHashNumbers>().unwrap().collect();
527 assert_eq!(
528 remaining.len(),
529 tx_hash_numbers_len - txs_up_to_block_6,
530 "Remaining RocksDB entries should match expected"
531 );
532 }
533 }
534
535 #[cfg(all(unix, feature = "rocksdb"))]
543 #[test]
544 fn prune_rocksdb_zero_deleted_checkpoint() {
545 use reth_db_api::models::StorageSettings;
546 use reth_provider::RocksDBProviderFactory;
547 use reth_storage_api::StorageSettingsCache;
548
549 let db = TestStageDB::default();
550 let mut rng = generators::rng();
551
552 let blocks = random_block_range(
553 &mut rng,
554 1..=10,
555 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
556 );
557 db.insert_blocks(blocks.iter(), StorageKind::Static).expect("insert blocks");
558
559 let mut tx_hash_numbers = Vec::new();
561 for block in &blocks {
562 tx_hash_numbers.reserve_exact(block.transaction_count());
563 for transaction in &block.body().transactions {
564 tx_hash_numbers.push((*transaction.tx_hash(), tx_hash_numbers.len() as u64));
565 }
566 }
567
568 {
570 let rocksdb = db.factory.rocksdb_provider();
571 let mut batch = rocksdb.batch();
572 for (hash, tx_num) in &tx_hash_numbers {
573 batch.put::<tables::TransactionHashNumbers>(*hash, tx_num).unwrap();
574 }
575 batch.commit().expect("commit rocksdb batch");
576 }
577
578 db.factory.set_storage_settings_cache(StorageSettings::v2());
580
581 let to_block: BlockNumber = 6;
582 let prune_mode = PruneMode::Before(to_block);
583
584 let previous_checkpoint =
586 Some(PruneCheckpoint { block_number: Some(2), tx_number: Some(5), prune_mode });
587
588 let mut limiter = PruneLimiter::default().set_deleted_entries_limit(1);
591 limiter.increment_deleted_entries_count(); let input = PruneInput { previous_checkpoint, to_block, limiter };
594 let segment = TransactionLookup::new(prune_mode);
595
596 let provider = db.factory.database_provider_rw().unwrap();
597 let result = segment.prune(&provider, input).unwrap();
598 provider.commit().expect("commit");
599
600 assert_eq!(result.pruned, 0, "Nothing should be pruned with exhausted limit");
602
603 if let Some(checkpoint) = &result.checkpoint {
607 assert_eq!(
608 checkpoint.tx_number,
609 Some(5),
610 "Checkpoint should stay at 5 (previous), not advance to 6 (start)"
611 );
612 }
613
614 {
616 let rocksdb = db.factory.rocksdb_provider();
617 let remaining: Vec<_> =
618 rocksdb.iter::<tables::TransactionHashNumbers>().unwrap().collect();
619 assert_eq!(
620 remaining.len(),
621 tx_hash_numbers.len(),
622 "All RocksDB entries should still exist"
623 );
624 }
625 }
626}