reth_prune/segments/static_file/
transactions.rs

1use crate::{
2    db_ext::DbTxPruneExt,
3    segments::{PruneInput, Segment},
4    PrunerError,
5};
6use reth_db_api::{table::Value, tables, transaction::DbTxMut};
7use reth_primitives_traits::NodePrimitives;
8use reth_provider::{
9    providers::StaticFileProvider, BlockReader, DBProvider, StaticFileProviderFactory,
10    TransactionsProvider,
11};
12use reth_prune_types::{
13    PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
14};
15use reth_static_file_types::StaticFileSegment;
16use tracing::trace;
17
18#[derive(Debug)]
19pub struct Transactions<N> {
20    static_file_provider: StaticFileProvider<N>,
21}
22
23impl<N> Transactions<N> {
24    pub const fn new(static_file_provider: StaticFileProvider<N>) -> Self {
25        Self { static_file_provider }
26    }
27}
28
29impl<Provider> Segment<Provider> for Transactions<Provider::Primitives>
30where
31    Provider: DBProvider<Tx: DbTxMut>
32        + TransactionsProvider
33        + BlockReader
34        + StaticFileProviderFactory<Primitives: NodePrimitives<SignedTx: Value>>,
35{
36    fn segment(&self) -> PruneSegment {
37        PruneSegment::Transactions
38    }
39
40    fn mode(&self) -> Option<PruneMode> {
41        self.static_file_provider
42            .get_highest_static_file_block(StaticFileSegment::Transactions)
43            .map(PruneMode::before_inclusive)
44    }
45
46    fn purpose(&self) -> PrunePurpose {
47        PrunePurpose::StaticFile
48    }
49
50    fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
51        let tx_range = match input.get_next_tx_num_range(provider)? {
52            Some(range) => range,
53            None => {
54                trace!(target: "pruner", "No transactions to prune");
55                return Ok(SegmentOutput::done())
56            }
57        };
58
59        let mut limiter = input.limiter;
60
61        let mut last_pruned_transaction = *tx_range.end();
62        let (pruned, done) = provider.tx_ref().prune_table_with_range::<tables::Transactions<
63            <Provider::Primitives as NodePrimitives>::SignedTx,
64        >>(
65            tx_range,
66            &mut limiter,
67            |_| false,
68            |row| last_pruned_transaction = row.0,
69        )?;
70        trace!(target: "pruner", %pruned, %done, "Pruned transactions");
71
72        let last_pruned_block = provider
73            .transaction_block(last_pruned_transaction)?
74            .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
75            // If there's more transactions to prune, set the checkpoint block number to previous,
76            // so we could finish pruning its transactions on the next run.
77            .checked_sub(if done { 0 } else { 1 });
78
79        let progress = limiter.progress(done);
80
81        Ok(SegmentOutput {
82            progress,
83            pruned,
84            checkpoint: Some(SegmentOutputCheckpoint {
85                block_number: last_pruned_block,
86                tx_number: Some(last_pruned_transaction),
87            }),
88        })
89    }
90}
91
92#[cfg(test)]
93mod tests {
94    use crate::segments::{PruneInput, PruneLimiter, Segment};
95    use alloy_primitives::{BlockNumber, TxNumber, B256};
96    use assert_matches::assert_matches;
97    use itertools::{
98        FoldWhile::{Continue, Done},
99        Itertools,
100    };
101    use reth_db_api::tables;
102    use reth_provider::{
103        DatabaseProviderFactory, PruneCheckpointReader, PruneCheckpointWriter,
104        StaticFileProviderFactory,
105    };
106    use reth_prune_types::{
107        PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
108        SegmentOutput,
109    };
110    use reth_stages::test_utils::{StorageKind, TestStageDB};
111    use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
112    use std::ops::Sub;
113
114    #[test]
115    fn prune() {
116        let db = TestStageDB::default();
117        let mut rng = generators::rng();
118
119        let blocks = random_block_range(
120            &mut rng,
121            1..=100,
122            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
123        );
124        db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
125
126        let transactions =
127            blocks.iter().flat_map(|block| &block.body().transactions).collect::<Vec<_>>();
128
129        assert_eq!(db.table::<tables::Transactions>().unwrap().len(), transactions.len());
130
131        let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| {
132            let segment = super::Transactions::new(db.factory.static_file_provider());
133            let prune_mode = PruneMode::Before(to_block);
134            let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10);
135            let input = PruneInput {
136                previous_checkpoint: db
137                    .factory
138                    .provider()
139                    .unwrap()
140                    .get_prune_checkpoint(PruneSegment::Transactions)
141                    .unwrap(),
142                to_block,
143                limiter: limiter.clone(),
144            };
145
146            let next_tx_number_to_prune = db
147                .factory
148                .provider()
149                .unwrap()
150                .get_prune_checkpoint(PruneSegment::Transactions)
151                .unwrap()
152                .and_then(|checkpoint| checkpoint.tx_number)
153                .map(|tx_number| tx_number + 1)
154                .unwrap_or_default();
155
156            let provider = db.factory.database_provider_rw().unwrap();
157            let result = segment.prune(&provider, input.clone()).unwrap();
158            limiter.increment_deleted_entries_count_by(result.pruned);
159
160            assert_matches!(
161                result,
162                SegmentOutput {progress, pruned, checkpoint: Some(_)}
163                    if (progress, pruned) == expected_result
164            );
165
166            provider
167                .save_prune_checkpoint(
168                    PruneSegment::Transactions,
169                    result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
170                )
171                .unwrap();
172            provider.commit().expect("commit");
173
174            let last_pruned_tx_number = blocks
175                .iter()
176                .take(to_block as usize)
177                .map(|block| block.transaction_count())
178                .sum::<usize>()
179                .min(
180                    next_tx_number_to_prune as usize +
181                        input.limiter.deleted_entries_limit().unwrap(),
182                )
183                .sub(1);
184
185            let last_pruned_block_number = blocks
186                .iter()
187                .fold_while((0, 0), |(_, mut tx_count), block| {
188                    tx_count += block.transaction_count();
189
190                    if tx_count > last_pruned_tx_number {
191                        Done((block.number, tx_count))
192                    } else {
193                        Continue((block.number, tx_count))
194                    }
195                })
196                .into_inner()
197                .0
198                .checked_sub(if result.progress.is_finished() { 0 } else { 1 });
199
200            assert_eq!(
201                db.table::<tables::Transactions>().unwrap().len(),
202                transactions.len() - (last_pruned_tx_number + 1)
203            );
204            assert_eq!(
205                db.factory
206                    .provider()
207                    .unwrap()
208                    .get_prune_checkpoint(PruneSegment::Transactions)
209                    .unwrap(),
210                Some(PruneCheckpoint {
211                    block_number: last_pruned_block_number,
212                    tx_number: Some(last_pruned_tx_number as TxNumber),
213                    prune_mode
214                })
215            );
216        };
217
218        test_prune(
219            6,
220            (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 10),
221        );
222        test_prune(6, (PruneProgress::Finished, 2));
223    }
224}