reth_prune/segments/user/
transaction_lookup.rs

1use crate::{
2    db_ext::DbTxPruneExt,
3    segments::{PruneInput, Segment, SegmentOutput},
4    PrunerError,
5};
6use alloy_eips::eip2718::Encodable2718;
7use rayon::prelude::*;
8use reth_db_api::{tables, transaction::DbTxMut};
9use reth_provider::{BlockReader, DBProvider};
10use reth_prune_types::{PruneMode, PrunePurpose, PruneSegment, SegmentOutputCheckpoint};
11use tracing::{instrument, trace};
12
13#[derive(Debug)]
14pub struct TransactionLookup {
15    mode: PruneMode,
16}
17
18impl TransactionLookup {
19    pub const fn new(mode: PruneMode) -> Self {
20        Self { mode }
21    }
22}
23
24impl<Provider> Segment<Provider> for TransactionLookup
25where
26    Provider: DBProvider<Tx: DbTxMut> + BlockReader<Transaction: Encodable2718>,
27{
28    fn segment(&self) -> PruneSegment {
29        PruneSegment::TransactionLookup
30    }
31
32    fn mode(&self) -> Option<PruneMode> {
33        Some(self.mode)
34    }
35
36    fn purpose(&self) -> PrunePurpose {
37        PrunePurpose::User
38    }
39
40    #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
41    fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
42        let (start, end) = match input.get_next_tx_num_range(provider)? {
43            Some(range) => range,
44            None => {
45                trace!(target: "pruner", "No transaction lookup entries to prune");
46                return Ok(SegmentOutput::done())
47            }
48        }
49        .into_inner();
50        let tx_range = start..=
51            Some(end)
52                .min(input.limiter.deleted_entries_limit_left().map(|left| start + left as u64 - 1))
53                .unwrap();
54        let tx_range_end = *tx_range.end();
55
56        // Retrieve transactions in the range and calculate their hashes in parallel
57        let hashes = provider
58            .transactions_by_tx_range(tx_range.clone())?
59            .into_par_iter()
60            .map(|transaction| transaction.trie_hash())
61            .collect::<Vec<_>>();
62
63        // Number of transactions retrieved from the database should match the tx range count
64        let tx_count = tx_range.count();
65        if hashes.len() != tx_count {
66            return Err(PrunerError::InconsistentData(
67                "Unexpected number of transaction hashes retrieved by transaction number range",
68            ))
69        }
70
71        let mut limiter = input.limiter;
72
73        let mut last_pruned_transaction = None;
74        let (pruned, done) =
75            provider.tx_ref().prune_table_with_iterator::<tables::TransactionHashNumbers>(
76                hashes,
77                &mut limiter,
78                |row| {
79                    last_pruned_transaction =
80                        Some(last_pruned_transaction.unwrap_or(row.1).max(row.1))
81                },
82            )?;
83
84        let done = done && tx_range_end == end;
85        trace!(target: "pruner", %pruned, %done, "Pruned transaction lookup");
86
87        let last_pruned_transaction = last_pruned_transaction.unwrap_or(tx_range_end);
88
89        let last_pruned_block = provider
90            .transaction_block(last_pruned_transaction)?
91            .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
92            // If there's more transaction lookup entries to prune, set the checkpoint block number
93            // to previous, so we could finish pruning its transaction lookup entries on the next
94            // run.
95            .checked_sub(if done { 0 } else { 1 });
96
97        let progress = limiter.progress(done);
98
99        Ok(SegmentOutput {
100            progress,
101            pruned,
102            checkpoint: Some(SegmentOutputCheckpoint {
103                block_number: last_pruned_block,
104                tx_number: Some(last_pruned_transaction),
105            }),
106        })
107    }
108}
109
110#[cfg(test)]
111mod tests {
112    use crate::segments::{PruneInput, PruneLimiter, Segment, SegmentOutput, TransactionLookup};
113    use alloy_primitives::{BlockNumber, TxNumber, B256};
114    use assert_matches::assert_matches;
115    use itertools::{
116        FoldWhile::{Continue, Done},
117        Itertools,
118    };
119    use reth_db_api::tables;
120    use reth_primitives_traits::SignedTransaction;
121    use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader};
122    use reth_prune_types::{
123        PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
124    };
125    use reth_stages::test_utils::{StorageKind, TestStageDB};
126    use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
127    use std::ops::Sub;
128
129    #[test]
130    fn prune() {
131        let db = TestStageDB::default();
132        let mut rng = generators::rng();
133
134        let blocks = random_block_range(
135            &mut rng,
136            1..=10,
137            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
138        );
139        db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
140
141        let mut tx_hash_numbers = Vec::new();
142        for block in &blocks {
143            tx_hash_numbers.reserve_exact(block.transaction_count());
144            for transaction in &block.body().transactions {
145                tx_hash_numbers.push((*transaction.tx_hash(), tx_hash_numbers.len() as u64));
146            }
147        }
148        let tx_hash_numbers_len = tx_hash_numbers.len();
149        db.insert_tx_hash_numbers(tx_hash_numbers).expect("insert tx hash numbers");
150
151        assert_eq!(
152            db.table::<tables::Transactions>().unwrap().len(),
153            blocks.iter().map(|block| block.transaction_count()).sum::<usize>()
154        );
155        assert_eq!(
156            db.table::<tables::Transactions>().unwrap().len(),
157            db.table::<tables::TransactionHashNumbers>().unwrap().len()
158        );
159
160        let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| {
161            let prune_mode = PruneMode::Before(to_block);
162            let segment = TransactionLookup::new(prune_mode);
163            let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10);
164            let input = PruneInput {
165                previous_checkpoint: db
166                    .factory
167                    .provider()
168                    .unwrap()
169                    .get_prune_checkpoint(PruneSegment::TransactionLookup)
170                    .unwrap(),
171                to_block,
172                limiter: limiter.clone(),
173            };
174
175            let next_tx_number_to_prune = db
176                .factory
177                .provider()
178                .unwrap()
179                .get_prune_checkpoint(PruneSegment::TransactionLookup)
180                .unwrap()
181                .and_then(|checkpoint| checkpoint.tx_number)
182                .map(|tx_number| tx_number + 1)
183                .unwrap_or_default();
184
185            let last_pruned_tx_number = blocks
186                .iter()
187                .take(to_block as usize)
188                .map(|block| block.transaction_count())
189                .sum::<usize>()
190                .min(
191                    next_tx_number_to_prune as usize +
192                        input.limiter.deleted_entries_limit().unwrap(),
193                )
194                .sub(1);
195
196            let last_pruned_block_number = blocks
197                .iter()
198                .fold_while((0, 0), |(_, mut tx_count), block| {
199                    tx_count += block.transaction_count();
200
201                    if tx_count > last_pruned_tx_number {
202                        Done((block.number, tx_count))
203                    } else {
204                        Continue((block.number, tx_count))
205                    }
206                })
207                .into_inner()
208                .0;
209
210            let provider = db.factory.database_provider_rw().unwrap();
211            let result = segment.prune(&provider, input).unwrap();
212            limiter.increment_deleted_entries_count_by(result.pruned);
213
214            assert_matches!(
215                result,
216                SegmentOutput {progress, pruned, checkpoint: Some(_)}
217                    if (progress, pruned) == expected_result
218            );
219
220            segment
221                .save_checkpoint(
222                    &provider,
223                    result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
224                )
225                .unwrap();
226            provider.commit().expect("commit");
227
228            let last_pruned_block_number = last_pruned_block_number
229                .checked_sub(if result.progress.is_finished() { 0 } else { 1 });
230
231            assert_eq!(
232                db.table::<tables::TransactionHashNumbers>().unwrap().len(),
233                tx_hash_numbers_len - (last_pruned_tx_number + 1)
234            );
235            assert_eq!(
236                db.factory
237                    .provider()
238                    .unwrap()
239                    .get_prune_checkpoint(PruneSegment::TransactionLookup)
240                    .unwrap(),
241                Some(PruneCheckpoint {
242                    block_number: last_pruned_block_number,
243                    tx_number: Some(last_pruned_tx_number as TxNumber),
244                    prune_mode
245                })
246            );
247        };
248
249        test_prune(
250            6,
251            (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 10),
252        );
253        test_prune(6, (PruneProgress::Finished, 2));
254        test_prune(10, (PruneProgress::Finished, 8));
255    }
256}