reth_prune/segments/static_file/
transactions.rs1use crate::{
2 db_ext::DbTxPruneExt,
3 segments::{PruneInput, Segment},
4 PrunerError,
5};
6use reth_db_api::{table::Value, tables, transaction::DbTxMut};
7use reth_primitives_traits::NodePrimitives;
8use reth_provider::{
9 providers::StaticFileProvider, BlockReader, DBProvider, StaticFileProviderFactory,
10 TransactionsProvider,
11};
12use reth_prune_types::{
13 PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
14};
15use reth_static_file_types::StaticFileSegment;
16use tracing::trace;
17
18#[derive(Debug)]
20pub struct Transactions<N> {
21 static_file_provider: StaticFileProvider<N>,
22}
23
24impl<N> Transactions<N> {
25 pub const fn new(static_file_provider: StaticFileProvider<N>) -> Self {
26 Self { static_file_provider }
27 }
28}
29
30impl<Provider> Segment<Provider> for Transactions<Provider::Primitives>
31where
32 Provider: DBProvider<Tx: DbTxMut>
33 + TransactionsProvider
34 + BlockReader
35 + StaticFileProviderFactory<Primitives: NodePrimitives<SignedTx: Value>>,
36{
37 fn segment(&self) -> PruneSegment {
38 PruneSegment::Transactions
39 }
40
41 fn mode(&self) -> Option<PruneMode> {
42 self.static_file_provider
43 .get_highest_static_file_block(StaticFileSegment::Transactions)
44 .map(PruneMode::before_inclusive)
45 }
46
47 fn purpose(&self) -> PrunePurpose {
48 PrunePurpose::StaticFile
49 }
50
51 fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
52 let tx_range = match input.get_next_tx_num_range(provider)? {
53 Some(range) => range,
54 None => {
55 trace!(target: "pruner", "No transactions to prune");
56 return Ok(SegmentOutput::done())
57 }
58 };
59
60 let mut limiter = input.limiter;
61
62 let mut last_pruned_transaction = *tx_range.end();
63 let (pruned, done) = provider.tx_ref().prune_table_with_range::<tables::Transactions<
64 <Provider::Primitives as NodePrimitives>::SignedTx,
65 >>(
66 tx_range,
67 &mut limiter,
68 |_| false,
69 |row| last_pruned_transaction = row.0,
70 )?;
71 trace!(target: "pruner", %pruned, %done, "Pruned transactions");
72
73 let last_pruned_block = provider
74 .transaction_block(last_pruned_transaction)?
75 .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
76 .checked_sub(if done { 0 } else { 1 });
79
80 let progress = limiter.progress(done);
81
82 Ok(SegmentOutput {
83 progress,
84 pruned,
85 checkpoint: Some(SegmentOutputCheckpoint {
86 block_number: last_pruned_block,
87 tx_number: Some(last_pruned_transaction),
88 }),
89 })
90 }
91}
92
93#[cfg(test)]
94mod tests {
95 use crate::segments::{PruneInput, PruneLimiter, Segment};
96 use alloy_primitives::{BlockNumber, TxNumber, B256};
97 use assert_matches::assert_matches;
98 use itertools::{
99 FoldWhile::{Continue, Done},
100 Itertools,
101 };
102 use reth_db_api::tables;
103 use reth_provider::{
104 DatabaseProviderFactory, PruneCheckpointReader, PruneCheckpointWriter,
105 StaticFileProviderFactory,
106 };
107 use reth_prune_types::{
108 PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
109 SegmentOutput,
110 };
111 use reth_stages::test_utils::{StorageKind, TestStageDB};
112 use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
113 use std::ops::Sub;
114
115 #[test]
116 fn prune() {
117 let db = TestStageDB::default();
118 let mut rng = generators::rng();
119
120 let blocks = random_block_range(
121 &mut rng,
122 1..=100,
123 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
124 );
125 db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
126
127 let transactions =
128 blocks.iter().flat_map(|block| &block.body().transactions).collect::<Vec<_>>();
129
130 assert_eq!(db.table::<tables::Transactions>().unwrap().len(), transactions.len());
131
132 let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| {
133 let segment = super::Transactions::new(db.factory.static_file_provider());
134 let prune_mode = PruneMode::Before(to_block);
135 let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10);
136 let input = PruneInput {
137 previous_checkpoint: db
138 .factory
139 .provider()
140 .unwrap()
141 .get_prune_checkpoint(PruneSegment::Transactions)
142 .unwrap(),
143 to_block,
144 limiter: limiter.clone(),
145 };
146
147 let next_tx_number_to_prune = db
148 .factory
149 .provider()
150 .unwrap()
151 .get_prune_checkpoint(PruneSegment::Transactions)
152 .unwrap()
153 .and_then(|checkpoint| checkpoint.tx_number)
154 .map(|tx_number| tx_number + 1)
155 .unwrap_or_default();
156
157 let provider = db.factory.database_provider_rw().unwrap();
158 let result = segment.prune(&provider, input.clone()).unwrap();
159 limiter.increment_deleted_entries_count_by(result.pruned);
160
161 assert_matches!(
162 result,
163 SegmentOutput {progress, pruned, checkpoint: Some(_)}
164 if (progress, pruned) == expected_result
165 );
166
167 provider
168 .save_prune_checkpoint(
169 PruneSegment::Transactions,
170 result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
171 )
172 .unwrap();
173 provider.commit().expect("commit");
174
175 let last_pruned_tx_number = blocks
176 .iter()
177 .take(to_block as usize)
178 .map(|block| block.transaction_count())
179 .sum::<usize>()
180 .min(
181 next_tx_number_to_prune as usize +
182 input.limiter.deleted_entries_limit().unwrap(),
183 )
184 .sub(1);
185
186 let last_pruned_block_number = blocks
187 .iter()
188 .fold_while((0, 0), |(_, mut tx_count), block| {
189 tx_count += block.transaction_count();
190
191 if tx_count > last_pruned_tx_number {
192 Done((block.number, tx_count))
193 } else {
194 Continue((block.number, tx_count))
195 }
196 })
197 .into_inner()
198 .0
199 .checked_sub(if result.progress.is_finished() { 0 } else { 1 });
200
201 assert_eq!(
202 db.table::<tables::Transactions>().unwrap().len(),
203 transactions.len() - (last_pruned_tx_number + 1)
204 );
205 assert_eq!(
206 db.factory
207 .provider()
208 .unwrap()
209 .get_prune_checkpoint(PruneSegment::Transactions)
210 .unwrap(),
211 Some(PruneCheckpoint {
212 block_number: last_pruned_block_number,
213 tx_number: Some(last_pruned_tx_number as TxNumber),
214 prune_mode
215 })
216 );
217 };
218
219 test_prune(
220 6,
221 (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 10),
222 );
223 test_prune(6, (PruneProgress::Finished, 2));
224 }
225}