reth_prune/segments/user/
transaction_lookup.rs

1use crate::{
2    db_ext::DbTxPruneExt,
3    segments::{PruneInput, Segment, SegmentOutput},
4    PrunerError,
5};
6use alloy_eips::eip2718::Encodable2718;
7use rayon::prelude::*;
8use reth_db_api::{tables, transaction::DbTxMut};
9use reth_provider::{BlockReader, DBProvider, PruneCheckpointReader};
10use reth_prune_types::{PruneMode, PrunePurpose, PruneSegment, SegmentOutputCheckpoint};
11use tracing::{debug, instrument, trace};
12
13#[derive(Debug)]
14pub struct TransactionLookup {
15    mode: PruneMode,
16}
17
18impl TransactionLookup {
19    pub const fn new(mode: PruneMode) -> Self {
20        Self { mode }
21    }
22}
23
24impl<Provider> Segment<Provider> for TransactionLookup
25where
26    Provider:
27        DBProvider<Tx: DbTxMut> + BlockReader<Transaction: Encodable2718> + PruneCheckpointReader,
28{
29    fn segment(&self) -> PruneSegment {
30        PruneSegment::TransactionLookup
31    }
32
33    fn mode(&self) -> Option<PruneMode> {
34        Some(self.mode)
35    }
36
37    fn purpose(&self) -> PrunePurpose {
38        PrunePurpose::User
39    }
40
41    #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
42    fn prune(
43        &self,
44        provider: &Provider,
45        mut input: PruneInput,
46    ) -> Result<SegmentOutput, PrunerError> {
47        // It is not possible to prune TransactionLookup data for which we don't have transaction
48        // data. If the TransactionLookup checkpoint is lagging behind (which can happen e.g. when
49        // pre-merge history is dropped and then later tx lookup pruning is enabled) then we can
50        // only prune from the tx checkpoint and onwards.
51        if let Some(txs_checkpoint) = provider.get_prune_checkpoint(PruneSegment::Transactions)? &&
52            input
53                .previous_checkpoint
54                .is_none_or(|checkpoint| checkpoint.block_number < txs_checkpoint.block_number)
55        {
56            input.previous_checkpoint = Some(txs_checkpoint);
57            debug!(
58                target: "pruner",
59                transactions_checkpoint = ?input.previous_checkpoint,
60                "No TransactionLookup checkpoint found, using Transactions checkpoint as fallback"
61            );
62        }
63
64        let (start, end) = match input.get_next_tx_num_range(provider)? {
65            Some(range) => range,
66            None => {
67                trace!(target: "pruner", "No transaction lookup entries to prune");
68                return Ok(SegmentOutput::done())
69            }
70        }
71        .into_inner();
72        let tx_range = start..=
73            Some(end)
74                .min(input.limiter.deleted_entries_limit_left().map(|left| start + left as u64 - 1))
75                .unwrap();
76        let tx_range_end = *tx_range.end();
77
78        // Retrieve transactions in the range and calculate their hashes in parallel
79        let hashes = provider
80            .transactions_by_tx_range(tx_range.clone())?
81            .into_par_iter()
82            .map(|transaction| transaction.trie_hash())
83            .collect::<Vec<_>>();
84
85        // Number of transactions retrieved from the database should match the tx range count
86        let tx_count = tx_range.count();
87        if hashes.len() != tx_count {
88            return Err(PrunerError::InconsistentData(
89                "Unexpected number of transaction hashes retrieved by transaction number range",
90            ))
91        }
92
93        let mut limiter = input.limiter;
94
95        let mut last_pruned_transaction = None;
96        let (pruned, done) =
97            provider.tx_ref().prune_table_with_iterator::<tables::TransactionHashNumbers>(
98                hashes,
99                &mut limiter,
100                |row| {
101                    last_pruned_transaction =
102                        Some(last_pruned_transaction.unwrap_or(row.1).max(row.1))
103                },
104            )?;
105
106        let done = done && tx_range_end == end;
107        trace!(target: "pruner", %pruned, %done, "Pruned transaction lookup");
108
109        let last_pruned_transaction = last_pruned_transaction.unwrap_or(tx_range_end);
110
111        let last_pruned_block = provider
112            .transaction_block(last_pruned_transaction)?
113            .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
114            // If there's more transaction lookup entries to prune, set the checkpoint block number
115            // to previous, so we could finish pruning its transaction lookup entries on the next
116            // run.
117            .checked_sub(if done { 0 } else { 1 });
118
119        let progress = limiter.progress(done);
120
121        Ok(SegmentOutput {
122            progress,
123            pruned,
124            checkpoint: Some(SegmentOutputCheckpoint {
125                block_number: last_pruned_block,
126                tx_number: Some(last_pruned_transaction),
127            }),
128        })
129    }
130}
131
132#[cfg(test)]
133mod tests {
134    use crate::segments::{PruneInput, PruneLimiter, Segment, SegmentOutput, TransactionLookup};
135    use alloy_primitives::{BlockNumber, TxNumber, B256};
136    use assert_matches::assert_matches;
137    use itertools::{
138        FoldWhile::{Continue, Done},
139        Itertools,
140    };
141    use reth_db_api::tables;
142    use reth_provider::{DBProvider, DatabaseProviderFactory, PruneCheckpointReader};
143    use reth_prune_types::{
144        PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
145    };
146    use reth_stages::test_utils::{StorageKind, TestStageDB};
147    use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
148    use std::ops::Sub;
149
150    #[test]
151    fn prune() {
152        let db = TestStageDB::default();
153        let mut rng = generators::rng();
154
155        let blocks = random_block_range(
156            &mut rng,
157            1..=10,
158            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
159        );
160        db.insert_blocks(blocks.iter(), StorageKind::Static).expect("insert blocks");
161
162        let mut tx_hash_numbers = Vec::new();
163        for block in &blocks {
164            tx_hash_numbers.reserve_exact(block.transaction_count());
165            for transaction in &block.body().transactions {
166                tx_hash_numbers.push((*transaction.tx_hash(), tx_hash_numbers.len() as u64));
167            }
168        }
169        let tx_hash_numbers_len = tx_hash_numbers.len();
170        db.insert_tx_hash_numbers(tx_hash_numbers).expect("insert tx hash numbers");
171
172        assert_eq!(
173            db.count_entries::<tables::Transactions>().unwrap(),
174            blocks.iter().map(|block| block.transaction_count()).sum::<usize>()
175        );
176        assert_eq!(
177            db.count_entries::<tables::Transactions>().unwrap(),
178            db.table::<tables::TransactionHashNumbers>().unwrap().len()
179        );
180
181        let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| {
182            let prune_mode = PruneMode::Before(to_block);
183            let segment = TransactionLookup::new(prune_mode);
184            let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10);
185            let input = PruneInput {
186                previous_checkpoint: db
187                    .factory
188                    .provider()
189                    .unwrap()
190                    .get_prune_checkpoint(PruneSegment::TransactionLookup)
191                    .unwrap(),
192                to_block,
193                limiter: limiter.clone(),
194            };
195
196            let next_tx_number_to_prune = db
197                .factory
198                .provider()
199                .unwrap()
200                .get_prune_checkpoint(PruneSegment::TransactionLookup)
201                .unwrap()
202                .and_then(|checkpoint| checkpoint.tx_number)
203                .map(|tx_number| tx_number + 1)
204                .unwrap_or_default();
205
206            let last_pruned_tx_number = blocks
207                .iter()
208                .take(to_block as usize)
209                .map(|block| block.transaction_count())
210                .sum::<usize>()
211                .min(
212                    next_tx_number_to_prune as usize +
213                        input.limiter.deleted_entries_limit().unwrap(),
214                )
215                .sub(1);
216
217            let last_pruned_block_number = blocks
218                .iter()
219                .fold_while((0, 0), |(_, mut tx_count), block| {
220                    tx_count += block.transaction_count();
221
222                    if tx_count > last_pruned_tx_number {
223                        Done((block.number, tx_count))
224                    } else {
225                        Continue((block.number, tx_count))
226                    }
227                })
228                .into_inner()
229                .0;
230
231            let provider = db.factory.database_provider_rw().unwrap();
232            let result = segment.prune(&provider, input).unwrap();
233            limiter.increment_deleted_entries_count_by(result.pruned);
234
235            assert_matches!(
236                result,
237                SegmentOutput {progress, pruned, checkpoint: Some(_)}
238                    if (progress, pruned) == expected_result
239            );
240
241            segment
242                .save_checkpoint(
243                    &provider,
244                    result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
245                )
246                .unwrap();
247            provider.commit().expect("commit");
248
249            let last_pruned_block_number = last_pruned_block_number
250                .checked_sub(if result.progress.is_finished() { 0 } else { 1 });
251
252            assert_eq!(
253                db.table::<tables::TransactionHashNumbers>().unwrap().len(),
254                tx_hash_numbers_len - (last_pruned_tx_number + 1)
255            );
256            assert_eq!(
257                db.factory
258                    .provider()
259                    .unwrap()
260                    .get_prune_checkpoint(PruneSegment::TransactionLookup)
261                    .unwrap(),
262                Some(PruneCheckpoint {
263                    block_number: last_pruned_block_number,
264                    tx_number: Some(last_pruned_tx_number as TxNumber),
265                    prune_mode
266                })
267            );
268        };
269
270        test_prune(
271            6,
272            (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 10),
273        );
274        test_prune(6, (PruneProgress::Finished, 2));
275        test_prune(10, (PruneProgress::Finished, 8));
276    }
277}