1use crate::{
2 db_ext::DbTxPruneExt,
3 segments::{PruneInput, Segment, SegmentOutput},
4 PrunerError,
5};
6use alloy_eips::eip2718::Encodable2718;
7use rayon::prelude::*;
8use reth_db_api::{tables, transaction::DbTxMut};
9use reth_provider::{
10 BlockReader, DBProvider, PruneCheckpointReader, RocksDBProviderFactory,
11 StaticFileProviderFactory,
12};
13use reth_prune_types::{
14 PruneCheckpoint, PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutputCheckpoint,
15};
16use reth_static_file_types::StaticFileSegment;
17use reth_storage_api::StorageSettingsCache;
18use tracing::{debug, instrument, trace};
19
20#[derive(Debug)]
21pub struct TransactionLookup {
22 mode: PruneMode,
23}
24
25impl TransactionLookup {
26 pub const fn new(mode: PruneMode) -> Self {
27 Self { mode }
28 }
29}
30
31impl<Provider> Segment<Provider> for TransactionLookup
32where
33 Provider: DBProvider<Tx: DbTxMut>
34 + BlockReader<Transaction: Encodable2718>
35 + PruneCheckpointReader
36 + StaticFileProviderFactory
37 + StorageSettingsCache
38 + RocksDBProviderFactory,
39{
40 fn segment(&self) -> PruneSegment {
41 PruneSegment::TransactionLookup
42 }
43
44 fn mode(&self) -> Option<PruneMode> {
45 Some(self.mode)
46 }
47
48 fn purpose(&self) -> PrunePurpose {
49 PrunePurpose::User
50 }
51
52 #[instrument(
53 name = "TransactionLookup::prune",
54 target = "pruner",
55 skip(self, provider),
56 ret(level = "trace")
57 )]
58 fn prune(
59 &self,
60 provider: &Provider,
61 mut input: PruneInput,
62 ) -> Result<SegmentOutput, PrunerError> {
63 if let Some(lowest_range) =
68 provider.static_file_provider().get_lowest_range(StaticFileSegment::Transactions) &&
69 input
70 .previous_checkpoint
71 .is_none_or(|checkpoint| checkpoint.block_number < Some(lowest_range.start()))
72 {
73 let new_checkpoint = lowest_range.start().saturating_sub(1);
74 if let Some(body_indices) = provider.block_body_indices(new_checkpoint)? {
75 input.previous_checkpoint = Some(PruneCheckpoint {
76 block_number: Some(new_checkpoint),
77 tx_number: Some(body_indices.last_tx_num()),
78 prune_mode: self.mode,
79 });
80 debug!(
81 target: "pruner",
82 static_file_checkpoint = ?input.previous_checkpoint,
83 "Using static file transaction checkpoint as TransactionLookup starting point"
84 );
85 }
86 }
87
88 let (start, end) = match input.get_next_tx_num_range(provider)? {
89 Some(range) => range,
90 None => {
91 trace!(target: "pruner", "No transaction lookup entries to prune");
92 return Ok(SegmentOutput::done())
93 }
94 }
95 .into_inner();
96
97 if provider.cached_storage_settings().storage_v2 {
99 return self.prune_rocksdb(provider, input, start, end);
100 }
101
102 if self.mode.is_full() {
104 let pruned = provider.tx_ref().clear_table::<tables::TransactionHashNumbers>()?;
105 trace!(target: "pruner", %pruned, "Cleared transaction lookup table");
106
107 let last_pruned_block = provider
108 .block_by_transaction_id(end)?
109 .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?;
110
111 return Ok(SegmentOutput {
112 progress: PruneProgress::Finished,
113 pruned,
114 checkpoint: Some(SegmentOutputCheckpoint {
115 block_number: Some(last_pruned_block),
116 tx_number: Some(end),
117 }),
118 });
119 }
120
121 let tx_range = start..=
122 Some(end)
123 .min(
124 input
125 .limiter
126 .deleted_entries_limit_left()
127 .map(|left| start.saturating_add(left as u64) - 1),
130 )
131 .unwrap();
132 let tx_range_end = *tx_range.end();
133
134 let mut hashes = provider
136 .transactions_by_tx_range(tx_range.clone())?
137 .into_par_iter()
138 .map(|transaction| transaction.trie_hash())
139 .collect::<Vec<_>>();
140
141 hashes.sort_unstable();
145
146 let tx_count = tx_range.count();
148 if hashes.len() != tx_count {
149 return Err(PrunerError::InconsistentData(
150 "Unexpected number of transaction hashes retrieved by transaction number range",
151 ))
152 }
153
154 let mut limiter = input.limiter;
155
156 let mut last_pruned_transaction = None;
157 let (pruned, done) =
158 provider.tx_ref().prune_table_with_iterator::<tables::TransactionHashNumbers>(
159 hashes,
160 &mut limiter,
161 |row| {
162 last_pruned_transaction =
163 Some(last_pruned_transaction.unwrap_or(row.1).max(row.1))
164 },
165 )?;
166
167 let done = done && tx_range_end == end;
168 trace!(target: "pruner", %pruned, %done, "Pruned transaction lookup");
169
170 let last_pruned_transaction = last_pruned_transaction.unwrap_or(tx_range_end);
171
172 let last_pruned_block = provider
173 .block_by_transaction_id(last_pruned_transaction)?
174 .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
175 .checked_sub(if done { 0 } else { 1 });
179
180 let progress = limiter.progress(done);
181
182 Ok(SegmentOutput {
183 progress,
184 pruned,
185 checkpoint: Some(SegmentOutputCheckpoint {
186 block_number: last_pruned_block,
187 tx_number: Some(last_pruned_transaction),
188 }),
189 })
190 }
191}
192
193impl TransactionLookup {
194 fn prune_rocksdb<Provider>(
199 &self,
200 provider: &Provider,
201 input: PruneInput,
202 start: alloy_primitives::TxNumber,
203 end: alloy_primitives::TxNumber,
204 ) -> Result<SegmentOutput, PrunerError>
205 where
206 Provider: DBProvider
207 + BlockReader<Transaction: Encodable2718>
208 + StaticFileProviderFactory
209 + RocksDBProviderFactory,
210 {
211 if self.mode.is_full() {
213 let rocksdb = provider.rocksdb_provider();
214 rocksdb.clear::<tables::TransactionHashNumbers>()?;
215 trace!(target: "pruner", "Cleared transaction lookup table (RocksDB)");
216
217 let last_pruned_block = provider
218 .block_by_transaction_id(end)?
219 .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?;
220
221 return Ok(SegmentOutput {
222 progress: PruneProgress::Finished,
223 pruned: 0, checkpoint: Some(SegmentOutputCheckpoint {
225 block_number: Some(last_pruned_block),
226 tx_number: Some(end),
227 }),
228 });
229 }
230
231 let tx_range_end = input
232 .limiter
233 .deleted_entries_limit_left()
234 .map(|left| start.saturating_add(left as u64).saturating_sub(1))
235 .map_or(end, |limited| limited.min(end));
236 let tx_range = start..=tx_range_end;
237
238 let hashes: Vec<_> = provider
240 .transactions_by_tx_range(tx_range.clone())?
241 .into_par_iter()
242 .map(|transaction| transaction.trie_hash())
243 .collect();
244
245 let tx_count = tx_range.count();
247 if hashes.len() != tx_count {
248 return Err(PrunerError::InconsistentData(
249 "Unexpected number of transaction hashes retrieved by transaction number range",
250 ))
251 }
252
253 let mut limiter = input.limiter;
254
255 let mut deleted = 0usize;
257 provider.with_rocksdb_batch(|mut batch| {
258 for hash in &hashes {
259 if limiter.is_limit_reached() {
260 break;
261 }
262 batch.delete::<tables::TransactionHashNumbers>(*hash)?;
263 limiter.increment_deleted_entries_count();
264 deleted += 1;
265 }
266 Ok(((), Some(batch.into_inner())))
267 })?;
268
269 let done = deleted == hashes.len() && tx_range_end == end;
270 trace!(target: "pruner", %deleted, %done, "Pruned transaction lookup (RocksDB)");
271
272 let last_pruned_transaction =
273 if deleted > 0 { start + deleted as u64 - 1 } else { tx_range_end };
274
275 let last_pruned_block = provider
276 .block_by_transaction_id(last_pruned_transaction)?
277 .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
278 .checked_sub(if done { 0 } else { 1 });
279
280 let progress = limiter.progress(done);
281
282 Ok(SegmentOutput {
283 progress,
284 pruned: deleted,
285 checkpoint: Some(SegmentOutputCheckpoint {
286 block_number: last_pruned_block,
287 tx_number: Some(last_pruned_transaction),
288 }),
289 })
290 }
291}
292
293#[cfg(test)]
294mod tests {
295 use crate::segments::{PruneInput, PruneLimiter, Segment, SegmentOutput, TransactionLookup};
296 use alloy_primitives::{BlockNumber, TxNumber, B256};
297 use assert_matches::assert_matches;
298 use itertools::{
299 FoldWhile::{Continue, Done},
300 Itertools,
301 };
302 use reth_db_api::tables;
303 use reth_provider::{DBProvider, DatabaseProviderFactory, PruneCheckpointReader};
304 use reth_prune_types::{
305 PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
306 };
307 use reth_stages::test_utils::{StorageKind, TestStageDB};
308 use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
309 use std::ops::Sub;
310
311 #[test]
312 fn prune() {
313 let db = TestStageDB::default();
314 let mut rng = generators::rng();
315
316 let blocks = random_block_range(
317 &mut rng,
318 1..=10,
319 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
320 );
321 db.insert_blocks(blocks.iter(), StorageKind::Static).expect("insert blocks");
322
323 let mut tx_hash_numbers = Vec::new();
324 for block in &blocks {
325 tx_hash_numbers.reserve_exact(block.transaction_count());
326 for transaction in &block.body().transactions {
327 tx_hash_numbers.push((*transaction.tx_hash(), tx_hash_numbers.len() as u64));
328 }
329 }
330 let tx_hash_numbers_len = tx_hash_numbers.len();
331 db.insert_tx_hash_numbers(tx_hash_numbers).expect("insert tx hash numbers");
332
333 assert_eq!(
334 db.count_entries::<tables::Transactions>().unwrap(),
335 blocks.iter().map(|block| block.transaction_count()).sum::<usize>()
336 );
337 assert_eq!(
338 db.count_entries::<tables::Transactions>().unwrap(),
339 db.table::<tables::TransactionHashNumbers>().unwrap().len()
340 );
341
342 let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| {
343 let prune_mode = PruneMode::Before(to_block);
344 let segment = TransactionLookup::new(prune_mode);
345 let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10);
346 let input = PruneInput {
347 previous_checkpoint: db
348 .factory
349 .provider()
350 .unwrap()
351 .get_prune_checkpoint(PruneSegment::TransactionLookup)
352 .unwrap(),
353 to_block,
354 limiter: limiter.clone(),
355 };
356
357 let next_tx_number_to_prune = db
358 .factory
359 .provider()
360 .unwrap()
361 .get_prune_checkpoint(PruneSegment::TransactionLookup)
362 .unwrap()
363 .and_then(|checkpoint| checkpoint.tx_number)
364 .map(|tx_number| tx_number + 1)
365 .unwrap_or_default();
366
367 let last_pruned_tx_number = blocks
368 .iter()
369 .take(to_block as usize)
370 .map(|block| block.transaction_count())
371 .sum::<usize>()
372 .min(
373 next_tx_number_to_prune as usize +
374 input.limiter.deleted_entries_limit().unwrap(),
375 )
376 .sub(1);
377
378 let last_pruned_block_number = blocks
379 .iter()
380 .fold_while((0, 0), |(_, mut tx_count), block| {
381 tx_count += block.transaction_count();
382
383 if tx_count > last_pruned_tx_number {
384 Done((block.number, tx_count))
385 } else {
386 Continue((block.number, tx_count))
387 }
388 })
389 .into_inner()
390 .0;
391
392 let provider = db.factory.database_provider_rw().unwrap();
393 let result = segment.prune(&provider, input).unwrap();
394 limiter.increment_deleted_entries_count_by(result.pruned);
395
396 assert_matches!(
397 result,
398 SegmentOutput {progress, pruned, checkpoint: Some(_)}
399 if (progress, pruned) == expected_result
400 );
401
402 segment
403 .save_checkpoint(
404 &provider,
405 result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
406 )
407 .unwrap();
408 provider.commit().expect("commit");
409
410 let last_pruned_block_number = last_pruned_block_number
411 .checked_sub(if result.progress.is_finished() { 0 } else { 1 });
412
413 assert_eq!(
414 db.table::<tables::TransactionHashNumbers>().unwrap().len(),
415 tx_hash_numbers_len - (last_pruned_tx_number + 1)
416 );
417 assert_eq!(
418 db.factory
419 .provider()
420 .unwrap()
421 .get_prune_checkpoint(PruneSegment::TransactionLookup)
422 .unwrap(),
423 Some(PruneCheckpoint {
424 block_number: last_pruned_block_number,
425 tx_number: Some(last_pruned_tx_number as TxNumber),
426 prune_mode
427 })
428 );
429 };
430
431 test_prune(
432 6,
433 (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 10),
434 );
435 test_prune(6, (PruneProgress::Finished, 2));
436 test_prune(10, (PruneProgress::Finished, 8));
437 }
438
439 #[test]
440 fn prune_rocksdb() {
441 use reth_db_api::models::StorageSettings;
442 use reth_provider::RocksDBProviderFactory;
443 use reth_storage_api::StorageSettingsCache;
444
445 let db = TestStageDB::default();
446 let mut rng = generators::rng();
447
448 let blocks = random_block_range(
449 &mut rng,
450 1..=10,
451 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
452 );
453 db.insert_blocks(blocks.iter(), StorageKind::Static).expect("insert blocks");
454
455 let mut tx_hash_numbers = Vec::new();
457 for block in &blocks {
458 tx_hash_numbers.reserve_exact(block.transaction_count());
459 for transaction in &block.body().transactions {
460 tx_hash_numbers.push((*transaction.tx_hash(), tx_hash_numbers.len() as u64));
461 }
462 }
463 let tx_hash_numbers_len = tx_hash_numbers.len();
464
465 {
467 let rocksdb = db.factory.rocksdb_provider();
468 let mut batch = rocksdb.batch();
469 for (hash, tx_num) in &tx_hash_numbers {
470 batch.put::<tables::TransactionHashNumbers>(*hash, tx_num).unwrap();
471 }
472 batch.commit().expect("commit rocksdb batch");
473 }
474
475 {
477 let rocksdb = db.factory.rocksdb_provider();
478 for (hash, expected_tx_num) in &tx_hash_numbers {
479 let actual = rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap();
480 assert_eq!(actual, Some(*expected_tx_num));
481 }
482 }
483
484 let to_block: BlockNumber = 6;
485 let prune_mode = PruneMode::Before(to_block);
486 let input =
487 PruneInput { previous_checkpoint: None, to_block, limiter: PruneLimiter::default() };
488 let segment = TransactionLookup::new(prune_mode);
489
490 db.factory.set_storage_settings_cache(StorageSettings::v2());
492
493 let provider = db.factory.database_provider_rw().unwrap();
494 let result = segment.prune(&provider, input).unwrap();
495 provider.commit().expect("commit");
496
497 assert_matches!(
498 result,
499 SegmentOutput { progress: PruneProgress::Finished, pruned, checkpoint: Some(_) }
500 if pruned > 0
501 );
502
503 let txs_up_to_block_6: usize = blocks.iter().take(6).map(|b| b.transaction_count()).sum();
505
506 {
508 let rocksdb = db.factory.rocksdb_provider();
509 for (i, (hash, _)) in tx_hash_numbers.iter().enumerate() {
510 let entry = rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap();
511 if i < txs_up_to_block_6 {
512 assert!(entry.is_none(), "Entry {} (hash {:?}) should be pruned", i, hash);
513 } else {
514 assert!(entry.is_some(), "Entry {} (hash {:?}) should still exist", i, hash);
515 }
516 }
517 }
518
519 {
521 let rocksdb = db.factory.rocksdb_provider();
522 let remaining: Vec<_> =
523 rocksdb.iter::<tables::TransactionHashNumbers>().unwrap().collect();
524 assert_eq!(
525 remaining.len(),
526 tx_hash_numbers_len - txs_up_to_block_6,
527 "Remaining RocksDB entries should match expected"
528 );
529 }
530 }
531
532 #[test]
540 fn prune_rocksdb_zero_deleted_checkpoint() {
541 use reth_db_api::models::StorageSettings;
542 use reth_provider::RocksDBProviderFactory;
543 use reth_storage_api::StorageSettingsCache;
544
545 let db = TestStageDB::default();
546 let mut rng = generators::rng();
547
548 let blocks = random_block_range(
549 &mut rng,
550 1..=10,
551 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
552 );
553 db.insert_blocks(blocks.iter(), StorageKind::Static).expect("insert blocks");
554
555 let mut tx_hash_numbers = Vec::new();
557 for block in &blocks {
558 tx_hash_numbers.reserve_exact(block.transaction_count());
559 for transaction in &block.body().transactions {
560 tx_hash_numbers.push((*transaction.tx_hash(), tx_hash_numbers.len() as u64));
561 }
562 }
563
564 {
566 let rocksdb = db.factory.rocksdb_provider();
567 let mut batch = rocksdb.batch();
568 for (hash, tx_num) in &tx_hash_numbers {
569 batch.put::<tables::TransactionHashNumbers>(*hash, tx_num).unwrap();
570 }
571 batch.commit().expect("commit rocksdb batch");
572 }
573
574 db.factory.set_storage_settings_cache(StorageSettings::v2());
576
577 let to_block: BlockNumber = 6;
578 let prune_mode = PruneMode::Before(to_block);
579
580 let previous_checkpoint =
582 Some(PruneCheckpoint { block_number: Some(2), tx_number: Some(5), prune_mode });
583
584 let mut limiter = PruneLimiter::default().set_deleted_entries_limit(1);
587 limiter.increment_deleted_entries_count(); let input = PruneInput { previous_checkpoint, to_block, limiter };
590 let segment = TransactionLookup::new(prune_mode);
591
592 let provider = db.factory.database_provider_rw().unwrap();
593 let result = segment.prune(&provider, input).unwrap();
594 provider.commit().expect("commit");
595
596 assert_eq!(result.pruned, 0, "Nothing should be pruned with exhausted limit");
598
599 if let Some(checkpoint) = &result.checkpoint {
603 assert_eq!(
604 checkpoint.tx_number,
605 Some(5),
606 "Checkpoint should stay at 5 (previous), not advance to 6 (start)"
607 );
608 }
609
610 {
612 let rocksdb = db.factory.rocksdb_provider();
613 let remaining: Vec<_> =
614 rocksdb.iter::<tables::TransactionHashNumbers>().unwrap().collect();
615 assert_eq!(
616 remaining.len(),
617 tx_hash_numbers.len(),
618 "All RocksDB entries should still exist"
619 );
620 }
621 }
622}