Skip to main content

reth_prune/segments/user/
transaction_lookup.rs

1use crate::{
2    db_ext::DbTxPruneExt,
3    segments::{PruneInput, Segment, SegmentOutput},
4    PrunerError,
5};
6use alloy_eips::eip2718::Encodable2718;
7use rayon::prelude::*;
8use reth_db_api::{tables, transaction::DbTxMut};
9use reth_provider::{
10    BlockReader, DBProvider, PruneCheckpointReader, RocksDBProviderFactory,
11    StaticFileProviderFactory,
12};
13use reth_prune_types::{
14    PruneCheckpoint, PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutputCheckpoint,
15};
16use reth_static_file_types::StaticFileSegment;
17use reth_storage_api::StorageSettingsCache;
18use tracing::{debug, instrument, trace};
19
20#[derive(Debug)]
21pub struct TransactionLookup {
22    mode: PruneMode,
23}
24
25impl TransactionLookup {
26    pub const fn new(mode: PruneMode) -> Self {
27        Self { mode }
28    }
29}
30
31impl<Provider> Segment<Provider> for TransactionLookup
32where
33    Provider: DBProvider<Tx: DbTxMut>
34        + BlockReader<Transaction: Encodable2718>
35        + PruneCheckpointReader
36        + StaticFileProviderFactory
37        + StorageSettingsCache
38        + RocksDBProviderFactory,
39{
40    fn segment(&self) -> PruneSegment {
41        PruneSegment::TransactionLookup
42    }
43
44    fn mode(&self) -> Option<PruneMode> {
45        Some(self.mode)
46    }
47
48    fn purpose(&self) -> PrunePurpose {
49        PrunePurpose::User
50    }
51
52    #[instrument(
53        name = "TransactionLookup::prune",
54        target = "pruner",
55        skip(self, provider),
56        ret(level = "trace")
57    )]
58    fn prune(
59        &self,
60        provider: &Provider,
61        mut input: PruneInput,
62    ) -> Result<SegmentOutput, PrunerError> {
63        // It is not possible to prune TransactionLookup data for which we don't have transaction
64        // data. If the TransactionLookup checkpoint is lagging behind (which can happen e.g. when
65        // pre-merge history is dropped and then later tx lookup pruning is enabled) then we can
66        // only prune from the lowest static file.
67        if let Some(lowest_range) =
68            provider.static_file_provider().get_lowest_range(StaticFileSegment::Transactions) &&
69            input
70                .previous_checkpoint
71                .is_none_or(|checkpoint| checkpoint.block_number < Some(lowest_range.start()))
72        {
73            let new_checkpoint = lowest_range.start().saturating_sub(1);
74            if let Some(body_indices) = provider.block_body_indices(new_checkpoint)? {
75                input.previous_checkpoint = Some(PruneCheckpoint {
76                    block_number: Some(new_checkpoint),
77                    tx_number: Some(body_indices.last_tx_num()),
78                    prune_mode: self.mode,
79                });
80                debug!(
81                    target: "pruner",
82                    static_file_checkpoint = ?input.previous_checkpoint,
83                    "Using static file transaction checkpoint as TransactionLookup starting point"
84                );
85            }
86        }
87
88        let (start, end) = match input.get_next_tx_num_range(provider)? {
89            Some(range) => range,
90            None => {
91                trace!(target: "pruner", "No transaction lookup entries to prune");
92                return Ok(SegmentOutput::done())
93            }
94        }
95        .into_inner();
96
97        // Check where transaction hash numbers are stored
98        if provider.cached_storage_settings().storage_v2 {
99            return self.prune_rocksdb(provider, input, start, end);
100        }
101
102        // For PruneMode::Full, clear the entire table in one operation
103        if self.mode.is_full() {
104            let pruned = provider.tx_ref().clear_table::<tables::TransactionHashNumbers>()?;
105            trace!(target: "pruner", %pruned, "Cleared transaction lookup table");
106
107            let last_pruned_block = provider
108                .block_by_transaction_id(end)?
109                .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?;
110
111            return Ok(SegmentOutput {
112                progress: PruneProgress::Finished,
113                pruned,
114                checkpoint: Some(SegmentOutputCheckpoint {
115                    block_number: Some(last_pruned_block),
116                    tx_number: Some(end),
117                }),
118            });
119        }
120
121        let tx_range = start..=
122            Some(end)
123                .min(
124                    input
125                        .limiter
126                        .deleted_entries_limit_left()
127                        // Use saturating addition here to avoid panicking on
128                        // `deleted_entries_limit == usize::MAX`
129                        .map(|left| start.saturating_add(left as u64) - 1),
130                )
131                .unwrap();
132        let tx_range_end = *tx_range.end();
133
134        // Retrieve transactions in the range and calculate their hashes in parallel
135        let mut hashes = provider
136            .transactions_by_tx_range(tx_range.clone())?
137            .into_par_iter()
138            .map(|transaction| transaction.trie_hash())
139            .collect::<Vec<_>>();
140
141        // Sort hashes to enable efficient cursor traversal through the TransactionHashNumbers
142        // table, which is keyed by hash. Without sorting, each seek would be O(log n) random
143        // access; with sorting, the cursor advances sequentially through the B+tree.
144        hashes.sort_unstable();
145
146        // Number of transactions retrieved from the database should match the tx range count
147        let tx_count = tx_range.count();
148        if hashes.len() != tx_count {
149            return Err(PrunerError::InconsistentData(
150                "Unexpected number of transaction hashes retrieved by transaction number range",
151            ))
152        }
153
154        let mut limiter = input.limiter;
155
156        let mut last_pruned_transaction = None;
157        let (pruned, done) =
158            provider.tx_ref().prune_table_with_iterator::<tables::TransactionHashNumbers>(
159                hashes,
160                &mut limiter,
161                |row| {
162                    last_pruned_transaction =
163                        Some(last_pruned_transaction.unwrap_or(row.1).max(row.1))
164                },
165            )?;
166
167        let done = done && tx_range_end == end;
168        trace!(target: "pruner", %pruned, %done, "Pruned transaction lookup");
169
170        let last_pruned_transaction = last_pruned_transaction.unwrap_or(tx_range_end);
171
172        let last_pruned_block = provider
173            .block_by_transaction_id(last_pruned_transaction)?
174            .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
175            // If there's more transaction lookup entries to prune, set the checkpoint block number
176            // to previous, so we could finish pruning its transaction lookup entries on the next
177            // run.
178            .checked_sub(if done { 0 } else { 1 });
179
180        let progress = limiter.progress(done);
181
182        Ok(SegmentOutput {
183            progress,
184            pruned,
185            checkpoint: Some(SegmentOutputCheckpoint {
186                block_number: last_pruned_block,
187                tx_number: Some(last_pruned_transaction),
188            }),
189        })
190    }
191}
192
193impl TransactionLookup {
194    /// Prunes transaction lookup when indices are stored in `RocksDB`.
195    ///
196    /// Reads transactions from static files and deletes corresponding entries
197    /// from the `RocksDB` `TransactionHashNumbers` table.
198    fn prune_rocksdb<Provider>(
199        &self,
200        provider: &Provider,
201        input: PruneInput,
202        start: alloy_primitives::TxNumber,
203        end: alloy_primitives::TxNumber,
204    ) -> Result<SegmentOutput, PrunerError>
205    where
206        Provider: DBProvider
207            + BlockReader<Transaction: Encodable2718>
208            + StaticFileProviderFactory
209            + RocksDBProviderFactory,
210    {
211        // For PruneMode::Full, clear the entire RocksDB table in one operation
212        if self.mode.is_full() {
213            let rocksdb = provider.rocksdb_provider();
214            rocksdb.clear::<tables::TransactionHashNumbers>()?;
215            trace!(target: "pruner", "Cleared transaction lookup table (RocksDB)");
216
217            let last_pruned_block = provider
218                .block_by_transaction_id(end)?
219                .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?;
220
221            return Ok(SegmentOutput {
222                progress: PruneProgress::Finished,
223                pruned: 0, // RocksDB clear doesn't return count
224                checkpoint: Some(SegmentOutputCheckpoint {
225                    block_number: Some(last_pruned_block),
226                    tx_number: Some(end),
227                }),
228            });
229        }
230
231        let tx_range_end = input
232            .limiter
233            .deleted_entries_limit_left()
234            .map(|left| start.saturating_add(left as u64).saturating_sub(1))
235            .map_or(end, |limited| limited.min(end));
236        let tx_range = start..=tx_range_end;
237
238        // Retrieve transactions in the range and calculate their hashes in parallel
239        let hashes: Vec<_> = provider
240            .transactions_by_tx_range(tx_range.clone())?
241            .into_par_iter()
242            .map(|transaction| transaction.trie_hash())
243            .collect();
244
245        // Number of transactions retrieved from the database should match the tx range count
246        let tx_count = tx_range.count();
247        if hashes.len() != tx_count {
248            return Err(PrunerError::InconsistentData(
249                "Unexpected number of transaction hashes retrieved by transaction number range",
250            ))
251        }
252
253        let mut limiter = input.limiter;
254
255        // Delete transaction hash -> number mappings from RocksDB
256        let mut deleted = 0usize;
257        provider.with_rocksdb_batch(|mut batch| {
258            for hash in &hashes {
259                if limiter.is_limit_reached() {
260                    break;
261                }
262                batch.delete::<tables::TransactionHashNumbers>(*hash)?;
263                limiter.increment_deleted_entries_count();
264                deleted += 1;
265            }
266            Ok(((), Some(batch.into_inner())))
267        })?;
268
269        let done = deleted == hashes.len() && tx_range_end == end;
270        trace!(target: "pruner", %deleted, %done, "Pruned transaction lookup (RocksDB)");
271
272        let last_pruned_transaction =
273            if deleted > 0 { start + deleted as u64 - 1 } else { tx_range_end };
274
275        let last_pruned_block = provider
276            .block_by_transaction_id(last_pruned_transaction)?
277            .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
278            .checked_sub(if done { 0 } else { 1 });
279
280        let progress = limiter.progress(done);
281
282        Ok(SegmentOutput {
283            progress,
284            pruned: deleted,
285            checkpoint: Some(SegmentOutputCheckpoint {
286                block_number: last_pruned_block,
287                tx_number: Some(last_pruned_transaction),
288            }),
289        })
290    }
291}
292
293#[cfg(test)]
294mod tests {
295    use crate::segments::{PruneInput, PruneLimiter, Segment, SegmentOutput, TransactionLookup};
296    use alloy_primitives::{BlockNumber, TxNumber, B256};
297    use assert_matches::assert_matches;
298    use itertools::{
299        FoldWhile::{Continue, Done},
300        Itertools,
301    };
302    use reth_db_api::tables;
303    use reth_provider::{DBProvider, DatabaseProviderFactory, PruneCheckpointReader};
304    use reth_prune_types::{
305        PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
306    };
307    use reth_stages::test_utils::{StorageKind, TestStageDB};
308    use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
309    use std::ops::Sub;
310
311    #[test]
312    fn prune() {
313        let db = TestStageDB::default();
314        let mut rng = generators::rng();
315
316        let blocks = random_block_range(
317            &mut rng,
318            1..=10,
319            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
320        );
321        db.insert_blocks(blocks.iter(), StorageKind::Static).expect("insert blocks");
322
323        let mut tx_hash_numbers = Vec::new();
324        for block in &blocks {
325            tx_hash_numbers.reserve_exact(block.transaction_count());
326            for transaction in &block.body().transactions {
327                tx_hash_numbers.push((*transaction.tx_hash(), tx_hash_numbers.len() as u64));
328            }
329        }
330        let tx_hash_numbers_len = tx_hash_numbers.len();
331        db.insert_tx_hash_numbers(tx_hash_numbers).expect("insert tx hash numbers");
332
333        assert_eq!(
334            db.count_entries::<tables::Transactions>().unwrap(),
335            blocks.iter().map(|block| block.transaction_count()).sum::<usize>()
336        );
337        assert_eq!(
338            db.count_entries::<tables::Transactions>().unwrap(),
339            db.table::<tables::TransactionHashNumbers>().unwrap().len()
340        );
341
342        let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| {
343            let prune_mode = PruneMode::Before(to_block);
344            let segment = TransactionLookup::new(prune_mode);
345            let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10);
346            let input = PruneInput {
347                previous_checkpoint: db
348                    .factory
349                    .provider()
350                    .unwrap()
351                    .get_prune_checkpoint(PruneSegment::TransactionLookup)
352                    .unwrap(),
353                to_block,
354                limiter: limiter.clone(),
355            };
356
357            let next_tx_number_to_prune = db
358                .factory
359                .provider()
360                .unwrap()
361                .get_prune_checkpoint(PruneSegment::TransactionLookup)
362                .unwrap()
363                .and_then(|checkpoint| checkpoint.tx_number)
364                .map(|tx_number| tx_number + 1)
365                .unwrap_or_default();
366
367            let last_pruned_tx_number = blocks
368                .iter()
369                .take(to_block as usize)
370                .map(|block| block.transaction_count())
371                .sum::<usize>()
372                .min(
373                    next_tx_number_to_prune as usize +
374                        input.limiter.deleted_entries_limit().unwrap(),
375                )
376                .sub(1);
377
378            let last_pruned_block_number = blocks
379                .iter()
380                .fold_while((0, 0), |(_, mut tx_count), block| {
381                    tx_count += block.transaction_count();
382
383                    if tx_count > last_pruned_tx_number {
384                        Done((block.number, tx_count))
385                    } else {
386                        Continue((block.number, tx_count))
387                    }
388                })
389                .into_inner()
390                .0;
391
392            let provider = db.factory.database_provider_rw().unwrap();
393            let result = segment.prune(&provider, input).unwrap();
394            limiter.increment_deleted_entries_count_by(result.pruned);
395
396            assert_matches!(
397                result,
398                SegmentOutput {progress, pruned, checkpoint: Some(_)}
399                    if (progress, pruned) == expected_result
400            );
401
402            segment
403                .save_checkpoint(
404                    &provider,
405                    result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
406                )
407                .unwrap();
408            provider.commit().expect("commit");
409
410            let last_pruned_block_number = last_pruned_block_number
411                .checked_sub(if result.progress.is_finished() { 0 } else { 1 });
412
413            assert_eq!(
414                db.table::<tables::TransactionHashNumbers>().unwrap().len(),
415                tx_hash_numbers_len - (last_pruned_tx_number + 1)
416            );
417            assert_eq!(
418                db.factory
419                    .provider()
420                    .unwrap()
421                    .get_prune_checkpoint(PruneSegment::TransactionLookup)
422                    .unwrap(),
423                Some(PruneCheckpoint {
424                    block_number: last_pruned_block_number,
425                    tx_number: Some(last_pruned_tx_number as TxNumber),
426                    prune_mode
427                })
428            );
429        };
430
431        test_prune(
432            6,
433            (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 10),
434        );
435        test_prune(6, (PruneProgress::Finished, 2));
436        test_prune(10, (PruneProgress::Finished, 8));
437    }
438
439    #[test]
440    fn prune_rocksdb() {
441        use reth_db_api::models::StorageSettings;
442        use reth_provider::RocksDBProviderFactory;
443        use reth_storage_api::StorageSettingsCache;
444
445        let db = TestStageDB::default();
446        let mut rng = generators::rng();
447
448        let blocks = random_block_range(
449            &mut rng,
450            1..=10,
451            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
452        );
453        db.insert_blocks(blocks.iter(), StorageKind::Static).expect("insert blocks");
454
455        // Collect transaction hashes and their tx numbers
456        let mut tx_hash_numbers = Vec::new();
457        for block in &blocks {
458            tx_hash_numbers.reserve_exact(block.transaction_count());
459            for transaction in &block.body().transactions {
460                tx_hash_numbers.push((*transaction.tx_hash(), tx_hash_numbers.len() as u64));
461            }
462        }
463        let tx_hash_numbers_len = tx_hash_numbers.len();
464
465        // Insert into RocksDB instead of MDBX
466        {
467            let rocksdb = db.factory.rocksdb_provider();
468            let mut batch = rocksdb.batch();
469            for (hash, tx_num) in &tx_hash_numbers {
470                batch.put::<tables::TransactionHashNumbers>(*hash, tx_num).unwrap();
471            }
472            batch.commit().expect("commit rocksdb batch");
473        }
474
475        // Verify RocksDB has all entries
476        {
477            let rocksdb = db.factory.rocksdb_provider();
478            for (hash, expected_tx_num) in &tx_hash_numbers {
479                let actual = rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap();
480                assert_eq!(actual, Some(*expected_tx_num));
481            }
482        }
483
484        let to_block: BlockNumber = 6;
485        let prune_mode = PruneMode::Before(to_block);
486        let input =
487            PruneInput { previous_checkpoint: None, to_block, limiter: PruneLimiter::default() };
488        let segment = TransactionLookup::new(prune_mode);
489
490        // Enable RocksDB storage for transaction hash numbers
491        db.factory.set_storage_settings_cache(StorageSettings::v2());
492
493        let provider = db.factory.database_provider_rw().unwrap();
494        let result = segment.prune(&provider, input).unwrap();
495        provider.commit().expect("commit");
496
497        assert_matches!(
498            result,
499            SegmentOutput { progress: PruneProgress::Finished, pruned, checkpoint: Some(_) }
500                if pruned > 0
501        );
502
503        // Calculate expected: blocks 1-6 should have their tx hashes pruned
504        let txs_up_to_block_6: usize = blocks.iter().take(6).map(|b| b.transaction_count()).sum();
505
506        // Verify RocksDB entries: first `txs_up_to_block_6` should be gone
507        {
508            let rocksdb = db.factory.rocksdb_provider();
509            for (i, (hash, _)) in tx_hash_numbers.iter().enumerate() {
510                let entry = rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap();
511                if i < txs_up_to_block_6 {
512                    assert!(entry.is_none(), "Entry {} (hash {:?}) should be pruned", i, hash);
513                } else {
514                    assert!(entry.is_some(), "Entry {} (hash {:?}) should still exist", i, hash);
515                }
516            }
517        }
518
519        // Verify remaining count
520        {
521            let rocksdb = db.factory.rocksdb_provider();
522            let remaining: Vec<_> =
523                rocksdb.iter::<tables::TransactionHashNumbers>().unwrap().collect();
524            assert_eq!(
525                remaining.len(),
526                tx_hash_numbers_len - txs_up_to_block_6,
527                "Remaining RocksDB entries should match expected"
528            );
529        }
530    }
531
532    /// Tests that when `RocksDB` prune deletes nothing (limit exhausted), checkpoint doesn't
533    /// advance.
534    ///
535    /// This test simulates a scenario where:
536    /// 1. Some transactions have already been pruned (checkpoint at tx 5)
537    /// 2. The deleted entries limit is exhausted before any new deletions
538    /// 3. The checkpoint should NOT advance to the next start position
539    #[test]
540    fn prune_rocksdb_zero_deleted_checkpoint() {
541        use reth_db_api::models::StorageSettings;
542        use reth_provider::RocksDBProviderFactory;
543        use reth_storage_api::StorageSettingsCache;
544
545        let db = TestStageDB::default();
546        let mut rng = generators::rng();
547
548        let blocks = random_block_range(
549            &mut rng,
550            1..=10,
551            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
552        );
553        db.insert_blocks(blocks.iter(), StorageKind::Static).expect("insert blocks");
554
555        // Collect transaction hashes and their tx numbers
556        let mut tx_hash_numbers = Vec::new();
557        for block in &blocks {
558            tx_hash_numbers.reserve_exact(block.transaction_count());
559            for transaction in &block.body().transactions {
560                tx_hash_numbers.push((*transaction.tx_hash(), tx_hash_numbers.len() as u64));
561            }
562        }
563
564        // Insert into RocksDB
565        {
566            let rocksdb = db.factory.rocksdb_provider();
567            let mut batch = rocksdb.batch();
568            for (hash, tx_num) in &tx_hash_numbers {
569                batch.put::<tables::TransactionHashNumbers>(*hash, tx_num).unwrap();
570            }
571            batch.commit().expect("commit rocksdb batch");
572        }
573
574        // Enable RocksDB storage for transaction hash numbers
575        db.factory.set_storage_settings_cache(StorageSettings::v2());
576
577        let to_block: BlockNumber = 6;
578        let prune_mode = PruneMode::Before(to_block);
579
580        // Simulate that we've already pruned up to tx 5, so start will be tx 6
581        let previous_checkpoint =
582            Some(PruneCheckpoint { block_number: Some(2), tx_number: Some(5), prune_mode });
583
584        // Create a limiter with limit of 1, but exhaust it before pruning
585        // This means deleted_entries_limit_left() = Some(0)
586        let mut limiter = PruneLimiter::default().set_deleted_entries_limit(1);
587        limiter.increment_deleted_entries_count(); // Exhaust the limit
588
589        let input = PruneInput { previous_checkpoint, to_block, limiter };
590        let segment = TransactionLookup::new(prune_mode);
591
592        let provider = db.factory.database_provider_rw().unwrap();
593        let result = segment.prune(&provider, input).unwrap();
594        provider.commit().expect("commit");
595
596        // With an exhausted limit, nothing should be deleted
597        assert_eq!(result.pruned, 0, "Nothing should be pruned with exhausted limit");
598
599        // The checkpoint tx_number should NOT advance to 6 (start)
600        // With the bug: checkpoint.tx_number = start = 6 (WRONG - claims tx 6 was pruned)
601        // With the fix: checkpoint.tx_number = tx_range_end = 5 (correct - no advancement)
602        if let Some(checkpoint) = &result.checkpoint {
603            assert_eq!(
604                checkpoint.tx_number,
605                Some(5),
606                "Checkpoint should stay at 5 (previous), not advance to 6 (start)"
607            );
608        }
609
610        // All RocksDB entries should still exist (nothing was actually deleted)
611        {
612            let rocksdb = db.factory.rocksdb_provider();
613            let remaining: Vec<_> =
614                rocksdb.iter::<tables::TransactionHashNumbers>().unwrap().collect();
615            assert_eq!(
616                remaining.len(),
617                tx_hash_numbers.len(),
618                "All RocksDB entries should still exist"
619            );
620        }
621    }
622}