Skip to main content

reth_prune/segments/user/
transaction_lookup.rs

1use crate::{
2    db_ext::DbTxPruneExt,
3    segments::{PruneInput, Segment, SegmentOutput},
4    PrunerError,
5};
6use alloy_consensus::transaction::TxHashRef;
7use rayon::prelude::*;
8use reth_db_api::{tables, transaction::DbTxMut};
9use reth_primitives_traits::SignedTransaction;
10use reth_provider::{
11    BlockReader, DBProvider, PruneCheckpointReader, RocksDBProviderFactory,
12    StaticFileProviderFactory,
13};
14use reth_prune_types::{
15    PruneCheckpoint, PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutputCheckpoint,
16};
17use reth_static_file_types::StaticFileSegment;
18use reth_storage_api::StorageSettingsCache;
19use tracing::{debug, instrument, trace};
20
21#[derive(Debug)]
22pub struct TransactionLookup {
23    mode: PruneMode,
24}
25
26impl TransactionLookup {
27    pub const fn new(mode: PruneMode) -> Self {
28        Self { mode }
29    }
30}
31
32impl<Provider> Segment<Provider> for TransactionLookup
33where
34    Provider: DBProvider<Tx: DbTxMut>
35        + BlockReader<Transaction: SignedTransaction>
36        + PruneCheckpointReader
37        + StaticFileProviderFactory
38        + StorageSettingsCache
39        + RocksDBProviderFactory,
40{
41    fn segment(&self) -> PruneSegment {
42        PruneSegment::TransactionLookup
43    }
44
45    fn mode(&self) -> Option<PruneMode> {
46        Some(self.mode)
47    }
48
49    fn purpose(&self) -> PrunePurpose {
50        PrunePurpose::User
51    }
52
53    #[instrument(
54        name = "TransactionLookup::prune",
55        target = "pruner",
56        skip(self, provider),
57        ret(level = "trace")
58    )]
59    fn prune(
60        &self,
61        provider: &Provider,
62        mut input: PruneInput,
63    ) -> Result<SegmentOutput, PrunerError> {
64        // It is not possible to prune TransactionLookup data for which we don't have transaction
65        // data. If the TransactionLookup checkpoint is lagging behind (which can happen e.g. when
66        // pre-merge history is dropped and then later tx lookup pruning is enabled) then we can
67        // only prune from the lowest static file.
68        if let Some(lowest_range) =
69            provider.static_file_provider().get_lowest_range(StaticFileSegment::Transactions) &&
70            input
71                .previous_checkpoint
72                .is_none_or(|checkpoint| checkpoint.block_number < Some(lowest_range.start()))
73        {
74            let new_checkpoint = lowest_range.start().saturating_sub(1);
75            if let Some(body_indices) = provider.block_body_indices(new_checkpoint)? {
76                input.previous_checkpoint = Some(PruneCheckpoint {
77                    block_number: Some(new_checkpoint),
78                    tx_number: Some(body_indices.last_tx_num()),
79                    prune_mode: self.mode,
80                });
81                debug!(
82                    target: "pruner",
83                    static_file_checkpoint = ?input.previous_checkpoint,
84                    "Using static file transaction checkpoint as TransactionLookup starting point"
85                );
86            }
87        }
88
89        let (start, end) = match input.get_next_tx_num_range(provider)? {
90            Some(range) => range,
91            None => {
92                trace!(target: "pruner", "No transaction lookup entries to prune");
93                return Ok(SegmentOutput::done())
94            }
95        }
96        .into_inner();
97
98        // Check where transaction hash numbers are stored
99        if provider.cached_storage_settings().storage_v2 {
100            return self.prune_rocksdb(provider, input, start, end);
101        }
102
103        // For PruneMode::Full, clear the entire table in one operation
104        if self.mode.is_full() {
105            let pruned = provider.tx_ref().clear_table::<tables::TransactionHashNumbers>()?;
106            trace!(target: "pruner", %pruned, "Cleared transaction lookup table");
107
108            let last_pruned_block = provider
109                .block_by_transaction_id(end)?
110                .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?;
111
112            return Ok(SegmentOutput {
113                progress: PruneProgress::Finished,
114                pruned,
115                checkpoint: Some(SegmentOutputCheckpoint {
116                    block_number: Some(last_pruned_block),
117                    tx_number: Some(end),
118                }),
119            });
120        }
121
122        let tx_range = start..=
123            Some(end)
124                .min(
125                    input
126                        .limiter
127                        .deleted_entries_limit_left()
128                        // Use saturating addition here to avoid panicking on
129                        // `deleted_entries_limit == usize::MAX`
130                        .map(|left| start.saturating_add(left as u64) - 1),
131                )
132                .unwrap();
133        let tx_range_end = *tx_range.end();
134
135        // Retrieve transactions in the range and collect their hashes in parallel.
136        let mut hashes = provider
137            .transactions_by_tx_range(tx_range.clone())?
138            .into_par_iter()
139            .map(|transaction| *transaction.tx_hash())
140            .collect::<Vec<_>>();
141
142        // Sort hashes to enable efficient cursor traversal through the TransactionHashNumbers
143        // table, which is keyed by hash. Without sorting, each seek would be O(log n) random
144        // access; with sorting, the cursor advances sequentially through the B+tree.
145        hashes.sort_unstable();
146
147        // Number of transactions retrieved from the database should match the tx range count
148        let tx_count = tx_range.count();
149        if hashes.len() != tx_count {
150            return Err(PrunerError::InconsistentData(
151                "Unexpected number of transaction hashes retrieved by transaction number range",
152            ))
153        }
154
155        let mut limiter = input.limiter;
156
157        let mut last_pruned_transaction = None;
158        let (pruned, done) =
159            provider.tx_ref().prune_table_with_iterator::<tables::TransactionHashNumbers>(
160                hashes,
161                &mut limiter,
162                |row| {
163                    last_pruned_transaction =
164                        Some(last_pruned_transaction.unwrap_or(row.1).max(row.1))
165                },
166            )?;
167
168        let done = done && tx_range_end == end;
169        trace!(target: "pruner", %pruned, %done, "Pruned transaction lookup");
170
171        let last_pruned_transaction = last_pruned_transaction.unwrap_or(tx_range_end);
172
173        let last_pruned_block = provider
174            .block_by_transaction_id(last_pruned_transaction)?
175            .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
176            // If there's more transaction lookup entries to prune, set the checkpoint block number
177            // to previous, so we could finish pruning its transaction lookup entries on the next
178            // run.
179            .checked_sub(if done { 0 } else { 1 });
180
181        let progress = limiter.progress(done);
182
183        Ok(SegmentOutput {
184            progress,
185            pruned,
186            checkpoint: Some(SegmentOutputCheckpoint {
187                block_number: last_pruned_block,
188                tx_number: Some(last_pruned_transaction),
189            }),
190        })
191    }
192}
193
194impl TransactionLookup {
195    /// Prunes transaction lookup when indices are stored in `RocksDB`.
196    ///
197    /// Reads transactions from static files and deletes corresponding entries
198    /// from the `RocksDB` `TransactionHashNumbers` table.
199    fn prune_rocksdb<Provider>(
200        &self,
201        provider: &Provider,
202        input: PruneInput,
203        start: alloy_primitives::TxNumber,
204        end: alloy_primitives::TxNumber,
205    ) -> Result<SegmentOutput, PrunerError>
206    where
207        Provider: DBProvider
208            + BlockReader<Transaction: SignedTransaction>
209            + StaticFileProviderFactory
210            + RocksDBProviderFactory,
211    {
212        // For PruneMode::Full, clear the entire RocksDB table in one operation
213        if self.mode.is_full() {
214            let rocksdb = provider.rocksdb_provider();
215            rocksdb.clear::<tables::TransactionHashNumbers>()?;
216            trace!(target: "pruner", "Cleared transaction lookup table (RocksDB)");
217
218            let last_pruned_block = provider
219                .block_by_transaction_id(end)?
220                .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?;
221
222            return Ok(SegmentOutput {
223                progress: PruneProgress::Finished,
224                pruned: 0, // RocksDB clear doesn't return count
225                checkpoint: Some(SegmentOutputCheckpoint {
226                    block_number: Some(last_pruned_block),
227                    tx_number: Some(end),
228                }),
229            });
230        }
231
232        let tx_range_end = input
233            .limiter
234            .deleted_entries_limit_left()
235            .map(|left| start.saturating_add(left as u64).saturating_sub(1))
236            .map_or(end, |limited| limited.min(end));
237        let tx_range = start..=tx_range_end;
238
239        // Retrieve transactions in the range and collect their hashes in parallel.
240        let hashes: Vec<_> = provider
241            .transactions_by_tx_range(tx_range.clone())?
242            .into_par_iter()
243            .map(|transaction| *transaction.tx_hash())
244            .collect();
245
246        // Number of transactions retrieved from the database should match the tx range count
247        let tx_count = tx_range.count();
248        if hashes.len() != tx_count {
249            return Err(PrunerError::InconsistentData(
250                "Unexpected number of transaction hashes retrieved by transaction number range",
251            ))
252        }
253
254        let mut limiter = input.limiter;
255
256        // Delete transaction hash -> number mappings from RocksDB
257        let mut deleted = 0usize;
258        provider.with_rocksdb_batch(|mut batch| {
259            for hash in &hashes {
260                if limiter.is_limit_reached() {
261                    break;
262                }
263                batch.delete::<tables::TransactionHashNumbers>(*hash)?;
264                limiter.increment_deleted_entries_count();
265                deleted += 1;
266            }
267            Ok(((), Some(batch.into_inner())))
268        })?;
269
270        let done = deleted == hashes.len() && tx_range_end == end;
271        trace!(target: "pruner", %deleted, %done, "Pruned transaction lookup (RocksDB)");
272
273        let last_pruned_transaction =
274            if deleted > 0 { start + deleted as u64 - 1 } else { tx_range_end };
275
276        let last_pruned_block = provider
277            .block_by_transaction_id(last_pruned_transaction)?
278            .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
279            .checked_sub(if done { 0 } else { 1 });
280
281        let progress = limiter.progress(done);
282
283        Ok(SegmentOutput {
284            progress,
285            pruned: deleted,
286            checkpoint: Some(SegmentOutputCheckpoint {
287                block_number: last_pruned_block,
288                tx_number: Some(last_pruned_transaction),
289            }),
290        })
291    }
292}
293
294#[cfg(test)]
295mod tests {
296    use crate::segments::{PruneInput, PruneLimiter, Segment, SegmentOutput, TransactionLookup};
297    use alloy_primitives::{BlockNumber, TxNumber, B256};
298    use assert_matches::assert_matches;
299    use itertools::{
300        FoldWhile::{Continue, Done},
301        Itertools,
302    };
303    use reth_db_api::tables;
304    use reth_provider::{DBProvider, DatabaseProviderFactory, PruneCheckpointReader};
305    use reth_prune_types::{
306        PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
307    };
308    use reth_stages::test_utils::{StorageKind, TestStageDB};
309    use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
310    use std::ops::Sub;
311
312    #[test]
313    fn prune() {
314        let db = TestStageDB::default();
315        let mut rng = generators::rng();
316
317        let blocks = random_block_range(
318            &mut rng,
319            1..=10,
320            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
321        );
322        db.insert_blocks(blocks.iter(), StorageKind::Static).expect("insert blocks");
323
324        let mut tx_hash_numbers = Vec::new();
325        for block in &blocks {
326            tx_hash_numbers.reserve_exact(block.transaction_count());
327            for transaction in &block.body().transactions {
328                tx_hash_numbers.push((*transaction.tx_hash(), tx_hash_numbers.len() as u64));
329            }
330        }
331        let tx_hash_numbers_len = tx_hash_numbers.len();
332        db.insert_tx_hash_numbers(tx_hash_numbers).expect("insert tx hash numbers");
333
334        assert_eq!(
335            db.count_entries::<tables::Transactions>().unwrap(),
336            blocks.iter().map(|block| block.transaction_count()).sum::<usize>()
337        );
338        assert_eq!(
339            db.count_entries::<tables::Transactions>().unwrap(),
340            db.table::<tables::TransactionHashNumbers>().unwrap().len()
341        );
342
343        let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| {
344            let prune_mode = PruneMode::Before(to_block);
345            let segment = TransactionLookup::new(prune_mode);
346            let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10);
347            let input = PruneInput {
348                previous_checkpoint: db
349                    .factory
350                    .provider()
351                    .unwrap()
352                    .get_prune_checkpoint(PruneSegment::TransactionLookup)
353                    .unwrap(),
354                to_block,
355                limiter: limiter.clone(),
356            };
357
358            let next_tx_number_to_prune = db
359                .factory
360                .provider()
361                .unwrap()
362                .get_prune_checkpoint(PruneSegment::TransactionLookup)
363                .unwrap()
364                .and_then(|checkpoint| checkpoint.tx_number)
365                .map(|tx_number| tx_number + 1)
366                .unwrap_or_default();
367
368            let last_pruned_tx_number = blocks
369                .iter()
370                .take(to_block as usize)
371                .map(|block| block.transaction_count())
372                .sum::<usize>()
373                .min(
374                    next_tx_number_to_prune as usize +
375                        input.limiter.deleted_entries_limit().unwrap(),
376                )
377                .sub(1);
378
379            let last_pruned_block_number = blocks
380                .iter()
381                .fold_while((0, 0), |(_, mut tx_count), block| {
382                    tx_count += block.transaction_count();
383
384                    if tx_count > last_pruned_tx_number {
385                        Done((block.number, tx_count))
386                    } else {
387                        Continue((block.number, tx_count))
388                    }
389                })
390                .into_inner()
391                .0;
392
393            let provider = db.factory.database_provider_rw().unwrap();
394            let result = segment.prune(&provider, input).unwrap();
395            limiter.increment_deleted_entries_count_by(result.pruned);
396
397            assert_matches!(
398                result,
399                SegmentOutput {progress, pruned, checkpoint: Some(_)}
400                    if (progress, pruned) == expected_result
401            );
402
403            segment
404                .save_checkpoint(
405                    &provider,
406                    result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
407                )
408                .unwrap();
409            provider.commit().expect("commit");
410
411            let last_pruned_block_number = last_pruned_block_number
412                .checked_sub(if result.progress.is_finished() { 0 } else { 1 });
413
414            assert_eq!(
415                db.table::<tables::TransactionHashNumbers>().unwrap().len(),
416                tx_hash_numbers_len - (last_pruned_tx_number + 1)
417            );
418            assert_eq!(
419                db.factory
420                    .provider()
421                    .unwrap()
422                    .get_prune_checkpoint(PruneSegment::TransactionLookup)
423                    .unwrap(),
424                Some(PruneCheckpoint {
425                    block_number: last_pruned_block_number,
426                    tx_number: Some(last_pruned_tx_number as TxNumber),
427                    prune_mode
428                })
429            );
430        };
431
432        test_prune(
433            6,
434            (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 10),
435        );
436        test_prune(6, (PruneProgress::Finished, 2));
437        test_prune(10, (PruneProgress::Finished, 8));
438    }
439
440    #[test]
441    fn prune_rocksdb() {
442        use reth_db_api::models::StorageSettings;
443        use reth_provider::RocksDBProviderFactory;
444        use reth_storage_api::StorageSettingsCache;
445
446        let db = TestStageDB::default();
447        let mut rng = generators::rng();
448
449        let blocks = random_block_range(
450            &mut rng,
451            1..=10,
452            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
453        );
454        db.insert_blocks(blocks.iter(), StorageKind::Static).expect("insert blocks");
455
456        // Collect transaction hashes and their tx numbers
457        let mut tx_hash_numbers = Vec::new();
458        for block in &blocks {
459            tx_hash_numbers.reserve_exact(block.transaction_count());
460            for transaction in &block.body().transactions {
461                tx_hash_numbers.push((*transaction.tx_hash(), tx_hash_numbers.len() as u64));
462            }
463        }
464        let tx_hash_numbers_len = tx_hash_numbers.len();
465
466        // Insert into RocksDB instead of MDBX
467        {
468            let rocksdb = db.factory.rocksdb_provider();
469            let mut batch = rocksdb.batch();
470            for (hash, tx_num) in &tx_hash_numbers {
471                batch.put::<tables::TransactionHashNumbers>(*hash, tx_num).unwrap();
472            }
473            batch.commit().expect("commit rocksdb batch");
474        }
475
476        // Verify RocksDB has all entries
477        {
478            let rocksdb = db.factory.rocksdb_provider();
479            for (hash, expected_tx_num) in &tx_hash_numbers {
480                let actual = rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap();
481                assert_eq!(actual, Some(*expected_tx_num));
482            }
483        }
484
485        let to_block: BlockNumber = 6;
486        let prune_mode = PruneMode::Before(to_block);
487        let input =
488            PruneInput { previous_checkpoint: None, to_block, limiter: PruneLimiter::default() };
489        let segment = TransactionLookup::new(prune_mode);
490
491        // Enable RocksDB storage for transaction hash numbers
492        db.factory.set_storage_settings_cache(StorageSettings::v2());
493
494        let provider = db.factory.database_provider_rw().unwrap();
495        let result = segment.prune(&provider, input).unwrap();
496        provider.commit().expect("commit");
497
498        assert_matches!(
499            result,
500            SegmentOutput { progress: PruneProgress::Finished, pruned, checkpoint: Some(_) }
501                if pruned > 0
502        );
503
504        // Calculate expected: blocks 1-6 should have their tx hashes pruned
505        let txs_up_to_block_6: usize = blocks.iter().take(6).map(|b| b.transaction_count()).sum();
506
507        // Verify RocksDB entries: first `txs_up_to_block_6` should be gone
508        {
509            let rocksdb = db.factory.rocksdb_provider();
510            for (i, (hash, _)) in tx_hash_numbers.iter().enumerate() {
511                let entry = rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap();
512                if i < txs_up_to_block_6 {
513                    assert!(entry.is_none(), "Entry {} (hash {:?}) should be pruned", i, hash);
514                } else {
515                    assert!(entry.is_some(), "Entry {} (hash {:?}) should still exist", i, hash);
516                }
517            }
518        }
519
520        // Verify remaining count
521        {
522            let rocksdb = db.factory.rocksdb_provider();
523            let remaining: Vec<_> =
524                rocksdb.iter::<tables::TransactionHashNumbers>().unwrap().collect();
525            assert_eq!(
526                remaining.len(),
527                tx_hash_numbers_len - txs_up_to_block_6,
528                "Remaining RocksDB entries should match expected"
529            );
530        }
531    }
532
533    /// Tests that when `RocksDB` prune deletes nothing (limit exhausted), checkpoint doesn't
534    /// advance.
535    ///
536    /// This test simulates a scenario where:
537    /// 1. Some transactions have already been pruned (checkpoint at tx 5)
538    /// 2. The deleted entries limit is exhausted before any new deletions
539    /// 3. The checkpoint should NOT advance to the next start position
540    #[test]
541    fn prune_rocksdb_zero_deleted_checkpoint() {
542        use reth_db_api::models::StorageSettings;
543        use reth_provider::RocksDBProviderFactory;
544        use reth_storage_api::StorageSettingsCache;
545
546        let db = TestStageDB::default();
547        let mut rng = generators::rng();
548
549        let blocks = random_block_range(
550            &mut rng,
551            1..=10,
552            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
553        );
554        db.insert_blocks(blocks.iter(), StorageKind::Static).expect("insert blocks");
555
556        // Collect transaction hashes and their tx numbers
557        let mut tx_hash_numbers = Vec::new();
558        for block in &blocks {
559            tx_hash_numbers.reserve_exact(block.transaction_count());
560            for transaction in &block.body().transactions {
561                tx_hash_numbers.push((*transaction.tx_hash(), tx_hash_numbers.len() as u64));
562            }
563        }
564
565        // Insert into RocksDB
566        {
567            let rocksdb = db.factory.rocksdb_provider();
568            let mut batch = rocksdb.batch();
569            for (hash, tx_num) in &tx_hash_numbers {
570                batch.put::<tables::TransactionHashNumbers>(*hash, tx_num).unwrap();
571            }
572            batch.commit().expect("commit rocksdb batch");
573        }
574
575        // Enable RocksDB storage for transaction hash numbers
576        db.factory.set_storage_settings_cache(StorageSettings::v2());
577
578        let to_block: BlockNumber = 6;
579        let prune_mode = PruneMode::Before(to_block);
580
581        // Simulate that we've already pruned up to tx 5, so start will be tx 6
582        let previous_checkpoint =
583            Some(PruneCheckpoint { block_number: Some(2), tx_number: Some(5), prune_mode });
584
585        // Create a limiter with limit of 1, but exhaust it before pruning
586        // This means deleted_entries_limit_left() = Some(0)
587        let mut limiter = PruneLimiter::default().set_deleted_entries_limit(1);
588        limiter.increment_deleted_entries_count(); // Exhaust the limit
589
590        let input = PruneInput { previous_checkpoint, to_block, limiter };
591        let segment = TransactionLookup::new(prune_mode);
592
593        let provider = db.factory.database_provider_rw().unwrap();
594        let result = segment.prune(&provider, input).unwrap();
595        provider.commit().expect("commit");
596
597        // With an exhausted limit, nothing should be deleted
598        assert_eq!(result.pruned, 0, "Nothing should be pruned with exhausted limit");
599
600        // The checkpoint tx_number should NOT advance to 6 (start)
601        // With the bug: checkpoint.tx_number = start = 6 (WRONG - claims tx 6 was pruned)
602        // With the fix: checkpoint.tx_number = tx_range_end = 5 (correct - no advancement)
603        if let Some(checkpoint) = &result.checkpoint {
604            assert_eq!(
605                checkpoint.tx_number,
606                Some(5),
607                "Checkpoint should stay at 5 (previous), not advance to 6 (start)"
608            );
609        }
610
611        // All RocksDB entries should still exist (nothing was actually deleted)
612        {
613            let rocksdb = db.factory.rocksdb_provider();
614            let remaining: Vec<_> =
615                rocksdb.iter::<tables::TransactionHashNumbers>().unwrap().collect();
616            assert_eq!(
617                remaining.len(),
618                tx_hash_numbers.len(),
619                "All RocksDB entries should still exist"
620            );
621        }
622    }
623}