reth_prune/segments/user/
transaction_lookup.rs

1use crate::{
2    db_ext::DbTxPruneExt,
3    segments::{PruneInput, Segment, SegmentOutput},
4    PrunerError,
5};
6use alloy_eips::eip2718::Encodable2718;
7use rayon::prelude::*;
8use reth_db_api::{tables, transaction::DbTxMut};
9use reth_provider::{BlockReader, DBProvider, PruneCheckpointReader, StaticFileProviderFactory};
10use reth_prune_types::{
11    PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment, SegmentOutputCheckpoint,
12};
13use reth_static_file_types::StaticFileSegment;
14use tracing::{debug, instrument, trace};
15
16#[derive(Debug)]
17pub struct TransactionLookup {
18    mode: PruneMode,
19}
20
21impl TransactionLookup {
22    pub const fn new(mode: PruneMode) -> Self {
23        Self { mode }
24    }
25}
26
27impl<Provider> Segment<Provider> for TransactionLookup
28where
29    Provider: DBProvider<Tx: DbTxMut>
30        + BlockReader<Transaction: Encodable2718>
31        + PruneCheckpointReader
32        + StaticFileProviderFactory,
33{
34    fn segment(&self) -> PruneSegment {
35        PruneSegment::TransactionLookup
36    }
37
38    fn mode(&self) -> Option<PruneMode> {
39        Some(self.mode)
40    }
41
42    fn purpose(&self) -> PrunePurpose {
43        PrunePurpose::User
44    }
45
46    #[instrument(target = "pruner", skip(self, provider), ret(level = "trace"))]
47    fn prune(
48        &self,
49        provider: &Provider,
50        mut input: PruneInput,
51    ) -> Result<SegmentOutput, PrunerError> {
52        // It is not possible to prune TransactionLookup data for which we don't have transaction
53        // data. If the TransactionLookup checkpoint is lagging behind (which can happen e.g. when
54        // pre-merge history is dropped and then later tx lookup pruning is enabled) then we can
55        // only prune from the lowest static file.
56        if let Some(lowest_range) =
57            provider.static_file_provider().get_lowest_range(StaticFileSegment::Transactions) &&
58            input
59                .previous_checkpoint
60                .is_none_or(|checkpoint| checkpoint.block_number < Some(lowest_range.start()))
61        {
62            let new_checkpoint = lowest_range.start().saturating_sub(1);
63            if let Some(body_indices) = provider.block_body_indices(new_checkpoint)? {
64                input.previous_checkpoint = Some(PruneCheckpoint {
65                    block_number: Some(new_checkpoint),
66                    tx_number: Some(body_indices.last_tx_num()),
67                    prune_mode: self.mode,
68                });
69                debug!(
70                    target: "pruner",
71                    static_file_checkpoint = ?input.previous_checkpoint,
72                    "Using static file transaction checkpoint as TransactionLookup starting point"
73                );
74            }
75        }
76
77        let (start, end) = match input.get_next_tx_num_range(provider)? {
78            Some(range) => range,
79            None => {
80                trace!(target: "pruner", "No transaction lookup entries to prune");
81                return Ok(SegmentOutput::done())
82            }
83        }
84        .into_inner();
85        let tx_range = start..=
86            Some(end)
87                .min(input.limiter.deleted_entries_limit_left().map(|left| start + left as u64 - 1))
88                .unwrap();
89        let tx_range_end = *tx_range.end();
90
91        // Retrieve transactions in the range and calculate their hashes in parallel
92        let hashes = provider
93            .transactions_by_tx_range(tx_range.clone())?
94            .into_par_iter()
95            .map(|transaction| transaction.trie_hash())
96            .collect::<Vec<_>>();
97
98        // Number of transactions retrieved from the database should match the tx range count
99        let tx_count = tx_range.count();
100        if hashes.len() != tx_count {
101            return Err(PrunerError::InconsistentData(
102                "Unexpected number of transaction hashes retrieved by transaction number range",
103            ))
104        }
105
106        let mut limiter = input.limiter;
107
108        let mut last_pruned_transaction = None;
109        let (pruned, done) =
110            provider.tx_ref().prune_table_with_iterator::<tables::TransactionHashNumbers>(
111                hashes,
112                &mut limiter,
113                |row| {
114                    last_pruned_transaction =
115                        Some(last_pruned_transaction.unwrap_or(row.1).max(row.1))
116                },
117            )?;
118
119        let done = done && tx_range_end == end;
120        trace!(target: "pruner", %pruned, %done, "Pruned transaction lookup");
121
122        let last_pruned_transaction = last_pruned_transaction.unwrap_or(tx_range_end);
123
124        let last_pruned_block = provider
125            .transaction_block(last_pruned_transaction)?
126            .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
127            // If there's more transaction lookup entries to prune, set the checkpoint block number
128            // to previous, so we could finish pruning its transaction lookup entries on the next
129            // run.
130            .checked_sub(if done { 0 } else { 1 });
131
132        let progress = limiter.progress(done);
133
134        Ok(SegmentOutput {
135            progress,
136            pruned,
137            checkpoint: Some(SegmentOutputCheckpoint {
138                block_number: last_pruned_block,
139                tx_number: Some(last_pruned_transaction),
140            }),
141        })
142    }
143}
144
145#[cfg(test)]
146mod tests {
147    use crate::segments::{PruneInput, PruneLimiter, Segment, SegmentOutput, TransactionLookup};
148    use alloy_primitives::{BlockNumber, TxNumber, B256};
149    use assert_matches::assert_matches;
150    use itertools::{
151        FoldWhile::{Continue, Done},
152        Itertools,
153    };
154    use reth_db_api::tables;
155    use reth_provider::{DBProvider, DatabaseProviderFactory, PruneCheckpointReader};
156    use reth_prune_types::{
157        PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
158    };
159    use reth_stages::test_utils::{StorageKind, TestStageDB};
160    use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
161    use std::ops::Sub;
162
163    #[test]
164    fn prune() {
165        let db = TestStageDB::default();
166        let mut rng = generators::rng();
167
168        let blocks = random_block_range(
169            &mut rng,
170            1..=10,
171            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
172        );
173        db.insert_blocks(blocks.iter(), StorageKind::Static).expect("insert blocks");
174
175        let mut tx_hash_numbers = Vec::new();
176        for block in &blocks {
177            tx_hash_numbers.reserve_exact(block.transaction_count());
178            for transaction in &block.body().transactions {
179                tx_hash_numbers.push((*transaction.tx_hash(), tx_hash_numbers.len() as u64));
180            }
181        }
182        let tx_hash_numbers_len = tx_hash_numbers.len();
183        db.insert_tx_hash_numbers(tx_hash_numbers).expect("insert tx hash numbers");
184
185        assert_eq!(
186            db.count_entries::<tables::Transactions>().unwrap(),
187            blocks.iter().map(|block| block.transaction_count()).sum::<usize>()
188        );
189        assert_eq!(
190            db.count_entries::<tables::Transactions>().unwrap(),
191            db.table::<tables::TransactionHashNumbers>().unwrap().len()
192        );
193
194        let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| {
195            let prune_mode = PruneMode::Before(to_block);
196            let segment = TransactionLookup::new(prune_mode);
197            let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10);
198            let input = PruneInput {
199                previous_checkpoint: db
200                    .factory
201                    .provider()
202                    .unwrap()
203                    .get_prune_checkpoint(PruneSegment::TransactionLookup)
204                    .unwrap(),
205                to_block,
206                limiter: limiter.clone(),
207            };
208
209            let next_tx_number_to_prune = db
210                .factory
211                .provider()
212                .unwrap()
213                .get_prune_checkpoint(PruneSegment::TransactionLookup)
214                .unwrap()
215                .and_then(|checkpoint| checkpoint.tx_number)
216                .map(|tx_number| tx_number + 1)
217                .unwrap_or_default();
218
219            let last_pruned_tx_number = blocks
220                .iter()
221                .take(to_block as usize)
222                .map(|block| block.transaction_count())
223                .sum::<usize>()
224                .min(
225                    next_tx_number_to_prune as usize +
226                        input.limiter.deleted_entries_limit().unwrap(),
227                )
228                .sub(1);
229
230            let last_pruned_block_number = blocks
231                .iter()
232                .fold_while((0, 0), |(_, mut tx_count), block| {
233                    tx_count += block.transaction_count();
234
235                    if tx_count > last_pruned_tx_number {
236                        Done((block.number, tx_count))
237                    } else {
238                        Continue((block.number, tx_count))
239                    }
240                })
241                .into_inner()
242                .0;
243
244            let provider = db.factory.database_provider_rw().unwrap();
245            let result = segment.prune(&provider, input).unwrap();
246            limiter.increment_deleted_entries_count_by(result.pruned);
247
248            assert_matches!(
249                result,
250                SegmentOutput {progress, pruned, checkpoint: Some(_)}
251                    if (progress, pruned) == expected_result
252            );
253
254            segment
255                .save_checkpoint(
256                    &provider,
257                    result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
258                )
259                .unwrap();
260            provider.commit().expect("commit");
261
262            let last_pruned_block_number = last_pruned_block_number
263                .checked_sub(if result.progress.is_finished() { 0 } else { 1 });
264
265            assert_eq!(
266                db.table::<tables::TransactionHashNumbers>().unwrap().len(),
267                tx_hash_numbers_len - (last_pruned_tx_number + 1)
268            );
269            assert_eq!(
270                db.factory
271                    .provider()
272                    .unwrap()
273                    .get_prune_checkpoint(PruneSegment::TransactionLookup)
274                    .unwrap(),
275                Some(PruneCheckpoint {
276                    block_number: last_pruned_block_number,
277                    tx_number: Some(last_pruned_tx_number as TxNumber),
278                    prune_mode
279                })
280            );
281        };
282
283        test_prune(
284            6,
285            (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 10),
286        );
287        test_prune(6, (PruneProgress::Finished, 2));
288        test_prune(10, (PruneProgress::Finished, 8));
289    }
290}