Skip to main content

reth_prune/segments/user/
transaction_lookup.rs

1use crate::{
2    db_ext::DbTxPruneExt,
3    segments::{PruneInput, Segment, SegmentOutput},
4    PrunerError,
5};
6use alloy_eips::eip2718::Encodable2718;
7use rayon::prelude::*;
8use reth_db_api::{tables, transaction::DbTxMut};
9use reth_provider::{
10    BlockReader, DBProvider, PruneCheckpointReader, RocksDBProviderFactory,
11    StaticFileProviderFactory,
12};
13use reth_prune_types::{
14    PruneCheckpoint, PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutputCheckpoint,
15};
16use reth_static_file_types::StaticFileSegment;
17use reth_storage_api::StorageSettingsCache;
18use tracing::{debug, instrument, trace};
19
20#[derive(Debug)]
21pub struct TransactionLookup {
22    mode: PruneMode,
23}
24
25impl TransactionLookup {
26    pub const fn new(mode: PruneMode) -> Self {
27        Self { mode }
28    }
29}
30
31impl<Provider> Segment<Provider> for TransactionLookup
32where
33    Provider: DBProvider<Tx: DbTxMut>
34        + BlockReader<Transaction: Encodable2718>
35        + PruneCheckpointReader
36        + StaticFileProviderFactory
37        + StorageSettingsCache
38        + RocksDBProviderFactory,
39{
40    fn segment(&self) -> PruneSegment {
41        PruneSegment::TransactionLookup
42    }
43
44    fn mode(&self) -> Option<PruneMode> {
45        Some(self.mode)
46    }
47
48    fn purpose(&self) -> PrunePurpose {
49        PrunePurpose::User
50    }
51
52    #[instrument(
53        name = "TransactionLookup::prune",
54        target = "pruner",
55        skip(self, provider),
56        ret(level = "trace")
57    )]
58    fn prune(
59        &self,
60        provider: &Provider,
61        mut input: PruneInput,
62    ) -> Result<SegmentOutput, PrunerError> {
63        // It is not possible to prune TransactionLookup data for which we don't have transaction
64        // data. If the TransactionLookup checkpoint is lagging behind (which can happen e.g. when
65        // pre-merge history is dropped and then later tx lookup pruning is enabled) then we can
66        // only prune from the lowest static file.
67        if let Some(lowest_range) =
68            provider.static_file_provider().get_lowest_range(StaticFileSegment::Transactions) &&
69            input
70                .previous_checkpoint
71                .is_none_or(|checkpoint| checkpoint.block_number < Some(lowest_range.start()))
72        {
73            let new_checkpoint = lowest_range.start().saturating_sub(1);
74            if let Some(body_indices) = provider.block_body_indices(new_checkpoint)? {
75                input.previous_checkpoint = Some(PruneCheckpoint {
76                    block_number: Some(new_checkpoint),
77                    tx_number: Some(body_indices.last_tx_num()),
78                    prune_mode: self.mode,
79                });
80                debug!(
81                    target: "pruner",
82                    static_file_checkpoint = ?input.previous_checkpoint,
83                    "Using static file transaction checkpoint as TransactionLookup starting point"
84                );
85            }
86        }
87
88        let (start, end) = match input.get_next_tx_num_range(provider)? {
89            Some(range) => range,
90            None => {
91                trace!(target: "pruner", "No transaction lookup entries to prune");
92                return Ok(SegmentOutput::done())
93            }
94        }
95        .into_inner();
96
97        // Check where transaction hash numbers are stored
98        #[cfg(all(unix, feature = "rocksdb"))]
99        if provider.cached_storage_settings().storage_v2 {
100            return self.prune_rocksdb(provider, input, start, end);
101        }
102
103        // For PruneMode::Full, clear the entire table in one operation
104        if self.mode.is_full() {
105            let pruned = provider.tx_ref().clear_table::<tables::TransactionHashNumbers>()?;
106            trace!(target: "pruner", %pruned, "Cleared transaction lookup table");
107
108            let last_pruned_block = provider
109                .block_by_transaction_id(end)?
110                .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?;
111
112            return Ok(SegmentOutput {
113                progress: PruneProgress::Finished,
114                pruned,
115                checkpoint: Some(SegmentOutputCheckpoint {
116                    block_number: Some(last_pruned_block),
117                    tx_number: Some(end),
118                }),
119            });
120        }
121
122        let tx_range = start..=
123            Some(end)
124                .min(
125                    input
126                        .limiter
127                        .deleted_entries_limit_left()
128                        // Use saturating addition here to avoid panicking on
129                        // `deleted_entries_limit == usize::MAX`
130                        .map(|left| start.saturating_add(left as u64) - 1),
131                )
132                .unwrap();
133        let tx_range_end = *tx_range.end();
134
135        // Retrieve transactions in the range and calculate their hashes in parallel
136        let mut hashes = provider
137            .transactions_by_tx_range(tx_range.clone())?
138            .into_par_iter()
139            .map(|transaction| transaction.trie_hash())
140            .collect::<Vec<_>>();
141
142        // Sort hashes to enable efficient cursor traversal through the TransactionHashNumbers
143        // table, which is keyed by hash. Without sorting, each seek would be O(log n) random
144        // access; with sorting, the cursor advances sequentially through the B+tree.
145        hashes.sort_unstable();
146
147        // Number of transactions retrieved from the database should match the tx range count
148        let tx_count = tx_range.count();
149        if hashes.len() != tx_count {
150            return Err(PrunerError::InconsistentData(
151                "Unexpected number of transaction hashes retrieved by transaction number range",
152            ))
153        }
154
155        let mut limiter = input.limiter;
156
157        let mut last_pruned_transaction = None;
158        let (pruned, done) =
159            provider.tx_ref().prune_table_with_iterator::<tables::TransactionHashNumbers>(
160                hashes,
161                &mut limiter,
162                |row| {
163                    last_pruned_transaction =
164                        Some(last_pruned_transaction.unwrap_or(row.1).max(row.1))
165                },
166            )?;
167
168        let done = done && tx_range_end == end;
169        trace!(target: "pruner", %pruned, %done, "Pruned transaction lookup");
170
171        let last_pruned_transaction = last_pruned_transaction.unwrap_or(tx_range_end);
172
173        let last_pruned_block = provider
174            .block_by_transaction_id(last_pruned_transaction)?
175            .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
176            // If there's more transaction lookup entries to prune, set the checkpoint block number
177            // to previous, so we could finish pruning its transaction lookup entries on the next
178            // run.
179            .checked_sub(if done { 0 } else { 1 });
180
181        let progress = limiter.progress(done);
182
183        Ok(SegmentOutput {
184            progress,
185            pruned,
186            checkpoint: Some(SegmentOutputCheckpoint {
187                block_number: last_pruned_block,
188                tx_number: Some(last_pruned_transaction),
189            }),
190        })
191    }
192}
193
194impl TransactionLookup {
195    /// Prunes transaction lookup when indices are stored in `RocksDB`.
196    ///
197    /// Reads transactions from static files and deletes corresponding entries
198    /// from the `RocksDB` `TransactionHashNumbers` table.
199    #[cfg(all(unix, feature = "rocksdb"))]
200    fn prune_rocksdb<Provider>(
201        &self,
202        provider: &Provider,
203        input: PruneInput,
204        start: alloy_primitives::TxNumber,
205        end: alloy_primitives::TxNumber,
206    ) -> Result<SegmentOutput, PrunerError>
207    where
208        Provider: DBProvider
209            + BlockReader<Transaction: Encodable2718>
210            + StaticFileProviderFactory
211            + RocksDBProviderFactory,
212    {
213        // For PruneMode::Full, clear the entire RocksDB table in one operation
214        if self.mode.is_full() {
215            let rocksdb = provider.rocksdb_provider();
216            rocksdb.clear::<tables::TransactionHashNumbers>()?;
217            trace!(target: "pruner", "Cleared transaction lookup table (RocksDB)");
218
219            let last_pruned_block = provider
220                .block_by_transaction_id(end)?
221                .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?;
222
223            return Ok(SegmentOutput {
224                progress: PruneProgress::Finished,
225                pruned: 0, // RocksDB clear doesn't return count
226                checkpoint: Some(SegmentOutputCheckpoint {
227                    block_number: Some(last_pruned_block),
228                    tx_number: Some(end),
229                }),
230            });
231        }
232
233        let tx_range_end = input
234            .limiter
235            .deleted_entries_limit_left()
236            .map(|left| start.saturating_add(left as u64).saturating_sub(1))
237            .map_or(end, |limited| limited.min(end));
238        let tx_range = start..=tx_range_end;
239
240        // Retrieve transactions in the range and calculate their hashes in parallel
241        let hashes: Vec<_> = provider
242            .transactions_by_tx_range(tx_range.clone())?
243            .into_par_iter()
244            .map(|transaction| transaction.trie_hash())
245            .collect();
246
247        // Number of transactions retrieved from the database should match the tx range count
248        let tx_count = tx_range.count();
249        if hashes.len() != tx_count {
250            return Err(PrunerError::InconsistentData(
251                "Unexpected number of transaction hashes retrieved by transaction number range",
252            ))
253        }
254
255        let mut limiter = input.limiter;
256
257        // Delete transaction hash -> number mappings from RocksDB
258        let mut deleted = 0usize;
259        provider.with_rocksdb_batch(|mut batch| {
260            for hash in &hashes {
261                if limiter.is_limit_reached() {
262                    break;
263                }
264                batch.delete::<tables::TransactionHashNumbers>(*hash)?;
265                limiter.increment_deleted_entries_count();
266                deleted += 1;
267            }
268            Ok(((), Some(batch.into_inner())))
269        })?;
270
271        let done = deleted == hashes.len() && tx_range_end == end;
272        trace!(target: "pruner", %deleted, %done, "Pruned transaction lookup (RocksDB)");
273
274        let last_pruned_transaction =
275            if deleted > 0 { start + deleted as u64 - 1 } else { tx_range_end };
276
277        let last_pruned_block = provider
278            .block_by_transaction_id(last_pruned_transaction)?
279            .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
280            .checked_sub(if done { 0 } else { 1 });
281
282        let progress = limiter.progress(done);
283
284        Ok(SegmentOutput {
285            progress,
286            pruned: deleted,
287            checkpoint: Some(SegmentOutputCheckpoint {
288                block_number: last_pruned_block,
289                tx_number: Some(last_pruned_transaction),
290            }),
291        })
292    }
293}
294
295#[cfg(test)]
296mod tests {
297    use crate::segments::{PruneInput, PruneLimiter, Segment, SegmentOutput, TransactionLookup};
298    use alloy_primitives::{BlockNumber, TxNumber, B256};
299    use assert_matches::assert_matches;
300    use itertools::{
301        FoldWhile::{Continue, Done},
302        Itertools,
303    };
304    use reth_db_api::tables;
305    use reth_provider::{DBProvider, DatabaseProviderFactory, PruneCheckpointReader};
306    use reth_prune_types::{
307        PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
308    };
309    use reth_stages::test_utils::{StorageKind, TestStageDB};
310    use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
311    use std::ops::Sub;
312
313    #[test]
314    fn prune() {
315        let db = TestStageDB::default();
316        let mut rng = generators::rng();
317
318        let blocks = random_block_range(
319            &mut rng,
320            1..=10,
321            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
322        );
323        db.insert_blocks(blocks.iter(), StorageKind::Static).expect("insert blocks");
324
325        let mut tx_hash_numbers = Vec::new();
326        for block in &blocks {
327            tx_hash_numbers.reserve_exact(block.transaction_count());
328            for transaction in &block.body().transactions {
329                tx_hash_numbers.push((*transaction.tx_hash(), tx_hash_numbers.len() as u64));
330            }
331        }
332        let tx_hash_numbers_len = tx_hash_numbers.len();
333        db.insert_tx_hash_numbers(tx_hash_numbers).expect("insert tx hash numbers");
334
335        assert_eq!(
336            db.count_entries::<tables::Transactions>().unwrap(),
337            blocks.iter().map(|block| block.transaction_count()).sum::<usize>()
338        );
339        assert_eq!(
340            db.count_entries::<tables::Transactions>().unwrap(),
341            db.table::<tables::TransactionHashNumbers>().unwrap().len()
342        );
343
344        let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| {
345            let prune_mode = PruneMode::Before(to_block);
346            let segment = TransactionLookup::new(prune_mode);
347            let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10);
348            let input = PruneInput {
349                previous_checkpoint: db
350                    .factory
351                    .provider()
352                    .unwrap()
353                    .get_prune_checkpoint(PruneSegment::TransactionLookup)
354                    .unwrap(),
355                to_block,
356                limiter: limiter.clone(),
357            };
358
359            let next_tx_number_to_prune = db
360                .factory
361                .provider()
362                .unwrap()
363                .get_prune_checkpoint(PruneSegment::TransactionLookup)
364                .unwrap()
365                .and_then(|checkpoint| checkpoint.tx_number)
366                .map(|tx_number| tx_number + 1)
367                .unwrap_or_default();
368
369            let last_pruned_tx_number = blocks
370                .iter()
371                .take(to_block as usize)
372                .map(|block| block.transaction_count())
373                .sum::<usize>()
374                .min(
375                    next_tx_number_to_prune as usize +
376                        input.limiter.deleted_entries_limit().unwrap(),
377                )
378                .sub(1);
379
380            let last_pruned_block_number = blocks
381                .iter()
382                .fold_while((0, 0), |(_, mut tx_count), block| {
383                    tx_count += block.transaction_count();
384
385                    if tx_count > last_pruned_tx_number {
386                        Done((block.number, tx_count))
387                    } else {
388                        Continue((block.number, tx_count))
389                    }
390                })
391                .into_inner()
392                .0;
393
394            let provider = db.factory.database_provider_rw().unwrap();
395            let result = segment.prune(&provider, input).unwrap();
396            limiter.increment_deleted_entries_count_by(result.pruned);
397
398            assert_matches!(
399                result,
400                SegmentOutput {progress, pruned, checkpoint: Some(_)}
401                    if (progress, pruned) == expected_result
402            );
403
404            segment
405                .save_checkpoint(
406                    &provider,
407                    result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
408                )
409                .unwrap();
410            provider.commit().expect("commit");
411
412            let last_pruned_block_number = last_pruned_block_number
413                .checked_sub(if result.progress.is_finished() { 0 } else { 1 });
414
415            assert_eq!(
416                db.table::<tables::TransactionHashNumbers>().unwrap().len(),
417                tx_hash_numbers_len - (last_pruned_tx_number + 1)
418            );
419            assert_eq!(
420                db.factory
421                    .provider()
422                    .unwrap()
423                    .get_prune_checkpoint(PruneSegment::TransactionLookup)
424                    .unwrap(),
425                Some(PruneCheckpoint {
426                    block_number: last_pruned_block_number,
427                    tx_number: Some(last_pruned_tx_number as TxNumber),
428                    prune_mode
429                })
430            );
431        };
432
433        test_prune(
434            6,
435            (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 10),
436        );
437        test_prune(6, (PruneProgress::Finished, 2));
438        test_prune(10, (PruneProgress::Finished, 8));
439    }
440
441    #[cfg(all(unix, feature = "rocksdb"))]
442    #[test]
443    fn prune_rocksdb() {
444        use reth_db_api::models::StorageSettings;
445        use reth_provider::RocksDBProviderFactory;
446        use reth_storage_api::StorageSettingsCache;
447
448        let db = TestStageDB::default();
449        let mut rng = generators::rng();
450
451        let blocks = random_block_range(
452            &mut rng,
453            1..=10,
454            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
455        );
456        db.insert_blocks(blocks.iter(), StorageKind::Static).expect("insert blocks");
457
458        // Collect transaction hashes and their tx numbers
459        let mut tx_hash_numbers = Vec::new();
460        for block in &blocks {
461            tx_hash_numbers.reserve_exact(block.transaction_count());
462            for transaction in &block.body().transactions {
463                tx_hash_numbers.push((*transaction.tx_hash(), tx_hash_numbers.len() as u64));
464            }
465        }
466        let tx_hash_numbers_len = tx_hash_numbers.len();
467
468        // Insert into RocksDB instead of MDBX
469        {
470            let rocksdb = db.factory.rocksdb_provider();
471            let mut batch = rocksdb.batch();
472            for (hash, tx_num) in &tx_hash_numbers {
473                batch.put::<tables::TransactionHashNumbers>(*hash, tx_num).unwrap();
474            }
475            batch.commit().expect("commit rocksdb batch");
476        }
477
478        // Verify RocksDB has all entries
479        {
480            let rocksdb = db.factory.rocksdb_provider();
481            for (hash, expected_tx_num) in &tx_hash_numbers {
482                let actual = rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap();
483                assert_eq!(actual, Some(*expected_tx_num));
484            }
485        }
486
487        let to_block: BlockNumber = 6;
488        let prune_mode = PruneMode::Before(to_block);
489        let input =
490            PruneInput { previous_checkpoint: None, to_block, limiter: PruneLimiter::default() };
491        let segment = TransactionLookup::new(prune_mode);
492
493        // Enable RocksDB storage for transaction hash numbers
494        db.factory.set_storage_settings_cache(StorageSettings::v2());
495
496        let provider = db.factory.database_provider_rw().unwrap();
497        let result = segment.prune(&provider, input).unwrap();
498        provider.commit().expect("commit");
499
500        assert_matches!(
501            result,
502            SegmentOutput { progress: PruneProgress::Finished, pruned, checkpoint: Some(_) }
503                if pruned > 0
504        );
505
506        // Calculate expected: blocks 1-6 should have their tx hashes pruned
507        let txs_up_to_block_6: usize = blocks.iter().take(6).map(|b| b.transaction_count()).sum();
508
509        // Verify RocksDB entries: first `txs_up_to_block_6` should be gone
510        {
511            let rocksdb = db.factory.rocksdb_provider();
512            for (i, (hash, _)) in tx_hash_numbers.iter().enumerate() {
513                let entry = rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap();
514                if i < txs_up_to_block_6 {
515                    assert!(entry.is_none(), "Entry {} (hash {:?}) should be pruned", i, hash);
516                } else {
517                    assert!(entry.is_some(), "Entry {} (hash {:?}) should still exist", i, hash);
518                }
519            }
520        }
521
522        // Verify remaining count
523        {
524            let rocksdb = db.factory.rocksdb_provider();
525            let remaining: Vec<_> =
526                rocksdb.iter::<tables::TransactionHashNumbers>().unwrap().collect();
527            assert_eq!(
528                remaining.len(),
529                tx_hash_numbers_len - txs_up_to_block_6,
530                "Remaining RocksDB entries should match expected"
531            );
532        }
533    }
534
535    /// Tests that when `RocksDB` prune deletes nothing (limit exhausted), checkpoint doesn't
536    /// advance.
537    ///
538    /// This test simulates a scenario where:
539    /// 1. Some transactions have already been pruned (checkpoint at tx 5)
540    /// 2. The deleted entries limit is exhausted before any new deletions
541    /// 3. The checkpoint should NOT advance to the next start position
542    #[cfg(all(unix, feature = "rocksdb"))]
543    #[test]
544    fn prune_rocksdb_zero_deleted_checkpoint() {
545        use reth_db_api::models::StorageSettings;
546        use reth_provider::RocksDBProviderFactory;
547        use reth_storage_api::StorageSettingsCache;
548
549        let db = TestStageDB::default();
550        let mut rng = generators::rng();
551
552        let blocks = random_block_range(
553            &mut rng,
554            1..=10,
555            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
556        );
557        db.insert_blocks(blocks.iter(), StorageKind::Static).expect("insert blocks");
558
559        // Collect transaction hashes and their tx numbers
560        let mut tx_hash_numbers = Vec::new();
561        for block in &blocks {
562            tx_hash_numbers.reserve_exact(block.transaction_count());
563            for transaction in &block.body().transactions {
564                tx_hash_numbers.push((*transaction.tx_hash(), tx_hash_numbers.len() as u64));
565            }
566        }
567
568        // Insert into RocksDB
569        {
570            let rocksdb = db.factory.rocksdb_provider();
571            let mut batch = rocksdb.batch();
572            for (hash, tx_num) in &tx_hash_numbers {
573                batch.put::<tables::TransactionHashNumbers>(*hash, tx_num).unwrap();
574            }
575            batch.commit().expect("commit rocksdb batch");
576        }
577
578        // Enable RocksDB storage for transaction hash numbers
579        db.factory.set_storage_settings_cache(StorageSettings::v2());
580
581        let to_block: BlockNumber = 6;
582        let prune_mode = PruneMode::Before(to_block);
583
584        // Simulate that we've already pruned up to tx 5, so start will be tx 6
585        let previous_checkpoint =
586            Some(PruneCheckpoint { block_number: Some(2), tx_number: Some(5), prune_mode });
587
588        // Create a limiter with limit of 1, but exhaust it before pruning
589        // This means deleted_entries_limit_left() = Some(0)
590        let mut limiter = PruneLimiter::default().set_deleted_entries_limit(1);
591        limiter.increment_deleted_entries_count(); // Exhaust the limit
592
593        let input = PruneInput { previous_checkpoint, to_block, limiter };
594        let segment = TransactionLookup::new(prune_mode);
595
596        let provider = db.factory.database_provider_rw().unwrap();
597        let result = segment.prune(&provider, input).unwrap();
598        provider.commit().expect("commit");
599
600        // With an exhausted limit, nothing should be deleted
601        assert_eq!(result.pruned, 0, "Nothing should be pruned with exhausted limit");
602
603        // The checkpoint tx_number should NOT advance to 6 (start)
604        // With the bug: checkpoint.tx_number = start = 6 (WRONG - claims tx 6 was pruned)
605        // With the fix: checkpoint.tx_number = tx_range_end = 5 (correct - no advancement)
606        if let Some(checkpoint) = &result.checkpoint {
607            assert_eq!(
608                checkpoint.tx_number,
609                Some(5),
610                "Checkpoint should stay at 5 (previous), not advance to 6 (start)"
611            );
612        }
613
614        // All RocksDB entries should still exist (nothing was actually deleted)
615        {
616            let rocksdb = db.factory.rocksdb_provider();
617            let remaining: Vec<_> =
618                rocksdb.iter::<tables::TransactionHashNumbers>().unwrap().collect();
619            assert_eq!(
620                remaining.len(),
621                tx_hash_numbers.len(),
622                "All RocksDB entries should still exist"
623            );
624        }
625    }
626}