reth_prune/segments/static_file/
transactions.rs

1use crate::{
2    db_ext::DbTxPruneExt,
3    segments::{PruneInput, Segment},
4    PrunerError,
5};
6use reth_db_api::{table::Value, tables, transaction::DbTxMut};
7use reth_primitives_traits::NodePrimitives;
8use reth_provider::{
9    providers::StaticFileProvider, BlockReader, DBProvider, StaticFileProviderFactory,
10    TransactionsProvider,
11};
12use reth_prune_types::{
13    PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
14};
15use reth_static_file_types::StaticFileSegment;
16use tracing::trace;
17
18/// The type responsible for pruning transactions in the database and history expiry.
19#[derive(Debug)]
20pub struct Transactions<N> {
21    static_file_provider: StaticFileProvider<N>,
22}
23
24impl<N> Transactions<N> {
25    pub const fn new(static_file_provider: StaticFileProvider<N>) -> Self {
26        Self { static_file_provider }
27    }
28}
29
30impl<Provider> Segment<Provider> for Transactions<Provider::Primitives>
31where
32    Provider: DBProvider<Tx: DbTxMut>
33        + TransactionsProvider
34        + BlockReader
35        + StaticFileProviderFactory<Primitives: NodePrimitives<SignedTx: Value>>,
36{
37    fn segment(&self) -> PruneSegment {
38        PruneSegment::Transactions
39    }
40
41    fn mode(&self) -> Option<PruneMode> {
42        self.static_file_provider
43            .get_highest_static_file_block(StaticFileSegment::Transactions)
44            .map(PruneMode::before_inclusive)
45    }
46
47    fn purpose(&self) -> PrunePurpose {
48        PrunePurpose::StaticFile
49    }
50
51    fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
52        let tx_range = match input.get_next_tx_num_range(provider)? {
53            Some(range) => range,
54            None => {
55                trace!(target: "pruner", "No transactions to prune");
56                return Ok(SegmentOutput::done())
57            }
58        };
59
60        let mut limiter = input.limiter;
61
62        let mut last_pruned_transaction = *tx_range.end();
63        let (pruned, done) = provider.tx_ref().prune_table_with_range::<tables::Transactions<
64            <Provider::Primitives as NodePrimitives>::SignedTx,
65        >>(
66            tx_range,
67            &mut limiter,
68            |_| false,
69            |row| last_pruned_transaction = row.0,
70        )?;
71        trace!(target: "pruner", %pruned, %done, "Pruned transactions");
72
73        let last_pruned_block = provider
74            .transaction_block(last_pruned_transaction)?
75            .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
76            // If there's more transactions to prune, set the checkpoint block number to previous,
77            // so we could finish pruning its transactions on the next run.
78            .checked_sub(if done { 0 } else { 1 });
79
80        let progress = limiter.progress(done);
81
82        Ok(SegmentOutput {
83            progress,
84            pruned,
85            checkpoint: Some(SegmentOutputCheckpoint {
86                block_number: last_pruned_block,
87                tx_number: Some(last_pruned_transaction),
88            }),
89        })
90    }
91}
92
93#[cfg(test)]
94mod tests {
95    use crate::segments::{PruneInput, PruneLimiter, Segment};
96    use alloy_primitives::{BlockNumber, TxNumber, B256};
97    use assert_matches::assert_matches;
98    use itertools::{
99        FoldWhile::{Continue, Done},
100        Itertools,
101    };
102    use reth_db_api::tables;
103    use reth_provider::{
104        DatabaseProviderFactory, PruneCheckpointReader, PruneCheckpointWriter,
105        StaticFileProviderFactory,
106    };
107    use reth_prune_types::{
108        PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
109        SegmentOutput,
110    };
111    use reth_stages::test_utils::{StorageKind, TestStageDB};
112    use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
113    use std::ops::Sub;
114
115    #[test]
116    fn prune() {
117        let db = TestStageDB::default();
118        let mut rng = generators::rng();
119
120        let blocks = random_block_range(
121            &mut rng,
122            1..=100,
123            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
124        );
125        db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
126
127        let transactions =
128            blocks.iter().flat_map(|block| &block.body().transactions).collect::<Vec<_>>();
129
130        assert_eq!(db.table::<tables::Transactions>().unwrap().len(), transactions.len());
131
132        let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| {
133            let segment = super::Transactions::new(db.factory.static_file_provider());
134            let prune_mode = PruneMode::Before(to_block);
135            let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10);
136            let input = PruneInput {
137                previous_checkpoint: db
138                    .factory
139                    .provider()
140                    .unwrap()
141                    .get_prune_checkpoint(PruneSegment::Transactions)
142                    .unwrap(),
143                to_block,
144                limiter: limiter.clone(),
145            };
146
147            let next_tx_number_to_prune = db
148                .factory
149                .provider()
150                .unwrap()
151                .get_prune_checkpoint(PruneSegment::Transactions)
152                .unwrap()
153                .and_then(|checkpoint| checkpoint.tx_number)
154                .map(|tx_number| tx_number + 1)
155                .unwrap_or_default();
156
157            let provider = db.factory.database_provider_rw().unwrap();
158            let result = segment.prune(&provider, input.clone()).unwrap();
159            limiter.increment_deleted_entries_count_by(result.pruned);
160
161            assert_matches!(
162                result,
163                SegmentOutput {progress, pruned, checkpoint: Some(_)}
164                    if (progress, pruned) == expected_result
165            );
166
167            provider
168                .save_prune_checkpoint(
169                    PruneSegment::Transactions,
170                    result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
171                )
172                .unwrap();
173            provider.commit().expect("commit");
174
175            let last_pruned_tx_number = blocks
176                .iter()
177                .take(to_block as usize)
178                .map(|block| block.transaction_count())
179                .sum::<usize>()
180                .min(
181                    next_tx_number_to_prune as usize +
182                        input.limiter.deleted_entries_limit().unwrap(),
183                )
184                .sub(1);
185
186            let last_pruned_block_number = blocks
187                .iter()
188                .fold_while((0, 0), |(_, mut tx_count), block| {
189                    tx_count += block.transaction_count();
190
191                    if tx_count > last_pruned_tx_number {
192                        Done((block.number, tx_count))
193                    } else {
194                        Continue((block.number, tx_count))
195                    }
196                })
197                .into_inner()
198                .0
199                .checked_sub(if result.progress.is_finished() { 0 } else { 1 });
200
201            assert_eq!(
202                db.table::<tables::Transactions>().unwrap().len(),
203                transactions.len() - (last_pruned_tx_number + 1)
204            );
205            assert_eq!(
206                db.factory
207                    .provider()
208                    .unwrap()
209                    .get_prune_checkpoint(PruneSegment::Transactions)
210                    .unwrap(),
211                Some(PruneCheckpoint {
212                    block_number: last_pruned_block_number,
213                    tx_number: Some(last_pruned_tx_number as TxNumber),
214                    prune_mode
215                })
216            );
217        };
218
219        test_prune(
220            6,
221            (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 10),
222        );
223        test_prune(6, (PruneProgress::Finished, 2));
224    }
225}