reth_prune/segments/user/
transaction_lookup.rs

1use crate::{
2    db_ext::DbTxPruneExt,
3    segments::{PruneInput, Segment, SegmentOutput},
4    PrunerError,
5};
6use alloy_eips::eip2718::Encodable2718;
7use rayon::prelude::*;
8use reth_db_api::{tables, transaction::DbTxMut};
9use reth_provider::{BlockReader, DBProvider};
10use reth_prune_types::{PruneMode, PrunePurpose, PruneSegment, SegmentOutputCheckpoint};
11use tracing::{instrument, trace};
12
13#[derive(Debug)]
14pub struct TransactionLookup {
15    mode: PruneMode,
16}
17
18impl TransactionLookup {
19    pub const fn new(mode: PruneMode) -> Self {
20        Self { mode }
21    }
22}
23
24impl<Provider> Segment<Provider> for TransactionLookup
25where
26    Provider: DBProvider<Tx: DbTxMut> + BlockReader<Transaction: Encodable2718>,
27{
28    fn segment(&self) -> PruneSegment {
29        PruneSegment::TransactionLookup
30    }
31
32    fn mode(&self) -> Option<PruneMode> {
33        Some(self.mode)
34    }
35
36    fn purpose(&self) -> PrunePurpose {
37        PrunePurpose::User
38    }
39
40    #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
41    fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
42        let (start, end) = match input.get_next_tx_num_range(provider)? {
43            Some(range) => range,
44            None => {
45                trace!(target: "pruner", "No transaction lookup entries to prune");
46                return Ok(SegmentOutput::done())
47            }
48        }
49        .into_inner();
50        let tx_range = start..=
51            Some(end)
52                .min(input.limiter.deleted_entries_limit_left().map(|left| start + left as u64 - 1))
53                .unwrap();
54        let tx_range_end = *tx_range.end();
55
56        // Retrieve transactions in the range and calculate their hashes in parallel
57        let hashes = provider
58            .transactions_by_tx_range(tx_range.clone())?
59            .into_par_iter()
60            .map(|transaction| transaction.trie_hash())
61            .collect::<Vec<_>>();
62
63        // Number of transactions retrieved from the database should match the tx range count
64        let tx_count = tx_range.count();
65        if hashes.len() != tx_count {
66            return Err(PrunerError::InconsistentData(
67                "Unexpected number of transaction hashes retrieved by transaction number range",
68            ))
69        }
70
71        let mut limiter = input.limiter;
72
73        let mut last_pruned_transaction = None;
74        let (pruned, done) =
75            provider.tx_ref().prune_table_with_iterator::<tables::TransactionHashNumbers>(
76                hashes,
77                &mut limiter,
78                |row| {
79                    last_pruned_transaction =
80                        Some(last_pruned_transaction.unwrap_or(row.1).max(row.1))
81                },
82            )?;
83
84        let done = done && tx_range_end == end;
85        trace!(target: "pruner", %pruned, %done, "Pruned transaction lookup");
86
87        let last_pruned_transaction = last_pruned_transaction.unwrap_or(tx_range_end);
88
89        let last_pruned_block = provider
90            .transaction_block(last_pruned_transaction)?
91            .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
92            // If there's more transaction lookup entries to prune, set the checkpoint block number
93            // to previous, so we could finish pruning its transaction lookup entries on the next
94            // run.
95            .checked_sub(if done { 0 } else { 1 });
96
97        let progress = limiter.progress(done);
98
99        Ok(SegmentOutput {
100            progress,
101            pruned,
102            checkpoint: Some(SegmentOutputCheckpoint {
103                block_number: last_pruned_block,
104                tx_number: Some(last_pruned_transaction),
105            }),
106        })
107    }
108}
109
110#[cfg(test)]
111mod tests {
112    use crate::segments::{PruneInput, PruneLimiter, Segment, SegmentOutput, TransactionLookup};
113    use alloy_primitives::{BlockNumber, TxNumber, B256};
114    use assert_matches::assert_matches;
115    use itertools::{
116        FoldWhile::{Continue, Done},
117        Itertools,
118    };
119    use reth_db_api::tables;
120    use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader};
121    use reth_prune_types::{
122        PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
123    };
124    use reth_stages::test_utils::{StorageKind, TestStageDB};
125    use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
126    use std::ops::Sub;
127
128    #[test]
129    fn prune() {
130        let db = TestStageDB::default();
131        let mut rng = generators::rng();
132
133        let blocks = random_block_range(
134            &mut rng,
135            1..=10,
136            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
137        );
138        db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
139
140        let mut tx_hash_numbers = Vec::new();
141        for block in &blocks {
142            tx_hash_numbers.reserve_exact(block.transaction_count());
143            for transaction in &block.body().transactions {
144                tx_hash_numbers.push((*transaction.tx_hash(), tx_hash_numbers.len() as u64));
145            }
146        }
147        let tx_hash_numbers_len = tx_hash_numbers.len();
148        db.insert_tx_hash_numbers(tx_hash_numbers).expect("insert tx hash numbers");
149
150        assert_eq!(
151            db.table::<tables::Transactions>().unwrap().len(),
152            blocks.iter().map(|block| block.transaction_count()).sum::<usize>()
153        );
154        assert_eq!(
155            db.table::<tables::Transactions>().unwrap().len(),
156            db.table::<tables::TransactionHashNumbers>().unwrap().len()
157        );
158
159        let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| {
160            let prune_mode = PruneMode::Before(to_block);
161            let segment = TransactionLookup::new(prune_mode);
162            let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10);
163            let input = PruneInput {
164                previous_checkpoint: db
165                    .factory
166                    .provider()
167                    .unwrap()
168                    .get_prune_checkpoint(PruneSegment::TransactionLookup)
169                    .unwrap(),
170                to_block,
171                limiter: limiter.clone(),
172            };
173
174            let next_tx_number_to_prune = db
175                .factory
176                .provider()
177                .unwrap()
178                .get_prune_checkpoint(PruneSegment::TransactionLookup)
179                .unwrap()
180                .and_then(|checkpoint| checkpoint.tx_number)
181                .map(|tx_number| tx_number + 1)
182                .unwrap_or_default();
183
184            let last_pruned_tx_number = blocks
185                .iter()
186                .take(to_block as usize)
187                .map(|block| block.transaction_count())
188                .sum::<usize>()
189                .min(
190                    next_tx_number_to_prune as usize +
191                        input.limiter.deleted_entries_limit().unwrap(),
192                )
193                .sub(1);
194
195            let last_pruned_block_number = blocks
196                .iter()
197                .fold_while((0, 0), |(_, mut tx_count), block| {
198                    tx_count += block.transaction_count();
199
200                    if tx_count > last_pruned_tx_number {
201                        Done((block.number, tx_count))
202                    } else {
203                        Continue((block.number, tx_count))
204                    }
205                })
206                .into_inner()
207                .0;
208
209            let provider = db.factory.database_provider_rw().unwrap();
210            let result = segment.prune(&provider, input).unwrap();
211            limiter.increment_deleted_entries_count_by(result.pruned);
212
213            assert_matches!(
214                result,
215                SegmentOutput {progress, pruned, checkpoint: Some(_)}
216                    if (progress, pruned) == expected_result
217            );
218
219            segment
220                .save_checkpoint(
221                    &provider,
222                    result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
223                )
224                .unwrap();
225            provider.commit().expect("commit");
226
227            let last_pruned_block_number = last_pruned_block_number
228                .checked_sub(if result.progress.is_finished() { 0 } else { 1 });
229
230            assert_eq!(
231                db.table::<tables::TransactionHashNumbers>().unwrap().len(),
232                tx_hash_numbers_len - (last_pruned_tx_number + 1)
233            );
234            assert_eq!(
235                db.factory
236                    .provider()
237                    .unwrap()
238                    .get_prune_checkpoint(PruneSegment::TransactionLookup)
239                    .unwrap(),
240                Some(PruneCheckpoint {
241                    block_number: last_pruned_block_number,
242                    tx_number: Some(last_pruned_tx_number as TxNumber),
243                    prune_mode
244                })
245            );
246        };
247
248        test_prune(
249            6,
250            (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 10),
251        );
252        test_prune(6, (PruneProgress::Finished, 2));
253        test_prune(10, (PruneProgress::Finished, 8));
254    }
255}