reth_prune/segments/user/
transaction_lookup.rs

1use crate::{
2    db_ext::DbTxPruneExt,
3    segments::{PruneInput, Segment, SegmentOutput},
4    PrunerError,
5};
6use alloy_eips::eip2718::Encodable2718;
7use rayon::prelude::*;
8use reth_db_api::{tables, transaction::DbTxMut};
9use reth_provider::{BlockReader, DBProvider, PruneCheckpointReader, StaticFileProviderFactory};
10use reth_prune_types::{
11    PruneCheckpoint, PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutputCheckpoint,
12};
13use reth_static_file_types::StaticFileSegment;
14use tracing::{debug, instrument, trace};
15
16#[derive(Debug)]
17pub struct TransactionLookup {
18    mode: PruneMode,
19}
20
21impl TransactionLookup {
22    pub const fn new(mode: PruneMode) -> Self {
23        Self { mode }
24    }
25}
26
27impl<Provider> Segment<Provider> for TransactionLookup
28where
29    Provider: DBProvider<Tx: DbTxMut>
30        + BlockReader<Transaction: Encodable2718>
31        + PruneCheckpointReader
32        + StaticFileProviderFactory,
33{
34    fn segment(&self) -> PruneSegment {
35        PruneSegment::TransactionLookup
36    }
37
38    fn mode(&self) -> Option<PruneMode> {
39        Some(self.mode)
40    }
41
42    fn purpose(&self) -> PrunePurpose {
43        PrunePurpose::User
44    }
45
46    #[instrument(target = "pruner", skip(self, provider), ret(level = "trace"))]
47    fn prune(
48        &self,
49        provider: &Provider,
50        mut input: PruneInput,
51    ) -> Result<SegmentOutput, PrunerError> {
52        // It is not possible to prune TransactionLookup data for which we don't have transaction
53        // data. If the TransactionLookup checkpoint is lagging behind (which can happen e.g. when
54        // pre-merge history is dropped and then later tx lookup pruning is enabled) then we can
55        // only prune from the lowest static file.
56        if let Some(lowest_range) =
57            provider.static_file_provider().get_lowest_range(StaticFileSegment::Transactions) &&
58            input
59                .previous_checkpoint
60                .is_none_or(|checkpoint| checkpoint.block_number < Some(lowest_range.start()))
61        {
62            let new_checkpoint = lowest_range.start().saturating_sub(1);
63            if let Some(body_indices) = provider.block_body_indices(new_checkpoint)? {
64                input.previous_checkpoint = Some(PruneCheckpoint {
65                    block_number: Some(new_checkpoint),
66                    tx_number: Some(body_indices.last_tx_num()),
67                    prune_mode: self.mode,
68                });
69                debug!(
70                    target: "pruner",
71                    static_file_checkpoint = ?input.previous_checkpoint,
72                    "Using static file transaction checkpoint as TransactionLookup starting point"
73                );
74            }
75        }
76
77        let (start, end) = match input.get_next_tx_num_range(provider)? {
78            Some(range) => range,
79            None => {
80                trace!(target: "pruner", "No transaction lookup entries to prune");
81                return Ok(SegmentOutput::done())
82            }
83        }
84        .into_inner();
85
86        // For PruneMode::Full, clear the entire table in one operation
87        if self.mode.is_full() {
88            let pruned = provider.tx_ref().clear_table::<tables::TransactionHashNumbers>()?;
89            trace!(target: "pruner", %pruned, "Cleared transaction lookup table");
90
91            let last_pruned_block = provider
92                .block_by_transaction_id(end)?
93                .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?;
94
95            return Ok(SegmentOutput {
96                progress: PruneProgress::Finished,
97                pruned,
98                checkpoint: Some(SegmentOutputCheckpoint {
99                    block_number: Some(last_pruned_block),
100                    tx_number: Some(end),
101                }),
102            });
103        }
104
105        let tx_range = start..=
106            Some(end)
107                .min(
108                    input
109                        .limiter
110                        .deleted_entries_limit_left()
111                        // Use saturating addition here to avoid panicking on
112                        // `deleted_entries_limit == usize::MAX`
113                        .map(|left| start.saturating_add(left as u64) - 1),
114                )
115                .unwrap();
116        let tx_range_end = *tx_range.end();
117
118        // Retrieve transactions in the range and calculate their hashes in parallel
119        let mut hashes = provider
120            .transactions_by_tx_range(tx_range.clone())?
121            .into_par_iter()
122            .map(|transaction| transaction.trie_hash())
123            .collect::<Vec<_>>();
124
125        // Sort hashes to enable efficient cursor traversal through the TransactionHashNumbers
126        // table, which is keyed by hash. Without sorting, each seek would be O(log n) random
127        // access; with sorting, the cursor advances sequentially through the B+tree.
128        hashes.sort_unstable();
129
130        // Number of transactions retrieved from the database should match the tx range count
131        let tx_count = tx_range.count();
132        if hashes.len() != tx_count {
133            return Err(PrunerError::InconsistentData(
134                "Unexpected number of transaction hashes retrieved by transaction number range",
135            ))
136        }
137
138        let mut limiter = input.limiter;
139
140        let mut last_pruned_transaction = None;
141        let (pruned, done) =
142            provider.tx_ref().prune_table_with_iterator::<tables::TransactionHashNumbers>(
143                hashes,
144                &mut limiter,
145                |row| {
146                    last_pruned_transaction =
147                        Some(last_pruned_transaction.unwrap_or(row.1).max(row.1))
148                },
149            )?;
150
151        let done = done && tx_range_end == end;
152        trace!(target: "pruner", %pruned, %done, "Pruned transaction lookup");
153
154        let last_pruned_transaction = last_pruned_transaction.unwrap_or(tx_range_end);
155
156        let last_pruned_block = provider
157            .block_by_transaction_id(last_pruned_transaction)?
158            .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
159            // If there's more transaction lookup entries to prune, set the checkpoint block number
160            // to previous, so we could finish pruning its transaction lookup entries on the next
161            // run.
162            .checked_sub(if done { 0 } else { 1 });
163
164        let progress = limiter.progress(done);
165
166        Ok(SegmentOutput {
167            progress,
168            pruned,
169            checkpoint: Some(SegmentOutputCheckpoint {
170                block_number: last_pruned_block,
171                tx_number: Some(last_pruned_transaction),
172            }),
173        })
174    }
175}
176
177#[cfg(test)]
178mod tests {
179    use crate::segments::{PruneInput, PruneLimiter, Segment, SegmentOutput, TransactionLookup};
180    use alloy_primitives::{BlockNumber, TxNumber, B256};
181    use assert_matches::assert_matches;
182    use itertools::{
183        FoldWhile::{Continue, Done},
184        Itertools,
185    };
186    use reth_db_api::tables;
187    use reth_provider::{DBProvider, DatabaseProviderFactory, PruneCheckpointReader};
188    use reth_prune_types::{
189        PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
190    };
191    use reth_stages::test_utils::{StorageKind, TestStageDB};
192    use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
193    use std::ops::Sub;
194
195    #[test]
196    fn prune() {
197        let db = TestStageDB::default();
198        let mut rng = generators::rng();
199
200        let blocks = random_block_range(
201            &mut rng,
202            1..=10,
203            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
204        );
205        db.insert_blocks(blocks.iter(), StorageKind::Static).expect("insert blocks");
206
207        let mut tx_hash_numbers = Vec::new();
208        for block in &blocks {
209            tx_hash_numbers.reserve_exact(block.transaction_count());
210            for transaction in &block.body().transactions {
211                tx_hash_numbers.push((*transaction.tx_hash(), tx_hash_numbers.len() as u64));
212            }
213        }
214        let tx_hash_numbers_len = tx_hash_numbers.len();
215        db.insert_tx_hash_numbers(tx_hash_numbers).expect("insert tx hash numbers");
216
217        assert_eq!(
218            db.count_entries::<tables::Transactions>().unwrap(),
219            blocks.iter().map(|block| block.transaction_count()).sum::<usize>()
220        );
221        assert_eq!(
222            db.count_entries::<tables::Transactions>().unwrap(),
223            db.table::<tables::TransactionHashNumbers>().unwrap().len()
224        );
225
226        let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| {
227            let prune_mode = PruneMode::Before(to_block);
228            let segment = TransactionLookup::new(prune_mode);
229            let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10);
230            let input = PruneInput {
231                previous_checkpoint: db
232                    .factory
233                    .provider()
234                    .unwrap()
235                    .get_prune_checkpoint(PruneSegment::TransactionLookup)
236                    .unwrap(),
237                to_block,
238                limiter: limiter.clone(),
239            };
240
241            let next_tx_number_to_prune = db
242                .factory
243                .provider()
244                .unwrap()
245                .get_prune_checkpoint(PruneSegment::TransactionLookup)
246                .unwrap()
247                .and_then(|checkpoint| checkpoint.tx_number)
248                .map(|tx_number| tx_number + 1)
249                .unwrap_or_default();
250
251            let last_pruned_tx_number = blocks
252                .iter()
253                .take(to_block as usize)
254                .map(|block| block.transaction_count())
255                .sum::<usize>()
256                .min(
257                    next_tx_number_to_prune as usize +
258                        input.limiter.deleted_entries_limit().unwrap(),
259                )
260                .sub(1);
261
262            let last_pruned_block_number = blocks
263                .iter()
264                .fold_while((0, 0), |(_, mut tx_count), block| {
265                    tx_count += block.transaction_count();
266
267                    if tx_count > last_pruned_tx_number {
268                        Done((block.number, tx_count))
269                    } else {
270                        Continue((block.number, tx_count))
271                    }
272                })
273                .into_inner()
274                .0;
275
276            let provider = db.factory.database_provider_rw().unwrap();
277            let result = segment.prune(&provider, input).unwrap();
278            limiter.increment_deleted_entries_count_by(result.pruned);
279
280            assert_matches!(
281                result,
282                SegmentOutput {progress, pruned, checkpoint: Some(_)}
283                    if (progress, pruned) == expected_result
284            );
285
286            segment
287                .save_checkpoint(
288                    &provider,
289                    result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
290                )
291                .unwrap();
292            provider.commit().expect("commit");
293
294            let last_pruned_block_number = last_pruned_block_number
295                .checked_sub(if result.progress.is_finished() { 0 } else { 1 });
296
297            assert_eq!(
298                db.table::<tables::TransactionHashNumbers>().unwrap().len(),
299                tx_hash_numbers_len - (last_pruned_tx_number + 1)
300            );
301            assert_eq!(
302                db.factory
303                    .provider()
304                    .unwrap()
305                    .get_prune_checkpoint(PruneSegment::TransactionLookup)
306                    .unwrap(),
307                Some(PruneCheckpoint {
308                    block_number: last_pruned_block_number,
309                    tx_number: Some(last_pruned_tx_number as TxNumber),
310                    prune_mode
311                })
312            );
313        };
314
315        test_prune(
316            6,
317            (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 10),
318        );
319        test_prune(6, (PruneProgress::Finished, 2));
320        test_prune(10, (PruneProgress::Finished, 8));
321    }
322}