reth_prune/segments/static_file/
transactions.rs
1use crate::{
2 db_ext::DbTxPruneExt,
3 segments::{PruneInput, Segment},
4 PrunerError,
5};
6use reth_db_api::{table::Value, tables, transaction::DbTxMut};
7use reth_primitives_traits::NodePrimitives;
8use reth_provider::{
9 providers::StaticFileProvider, BlockReader, DBProvider, StaticFileProviderFactory,
10 TransactionsProvider,
11};
12use reth_prune_types::{
13 PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
14};
15use reth_static_file_types::StaticFileSegment;
16use tracing::trace;
17
18#[derive(Debug)]
19pub struct Transactions<N> {
20 static_file_provider: StaticFileProvider<N>,
21}
22
23impl<N> Transactions<N> {
24 pub const fn new(static_file_provider: StaticFileProvider<N>) -> Self {
25 Self { static_file_provider }
26 }
27}
28
29impl<Provider> Segment<Provider> for Transactions<Provider::Primitives>
30where
31 Provider: DBProvider<Tx: DbTxMut>
32 + TransactionsProvider
33 + BlockReader
34 + StaticFileProviderFactory<Primitives: NodePrimitives<SignedTx: Value>>,
35{
36 fn segment(&self) -> PruneSegment {
37 PruneSegment::Transactions
38 }
39
40 fn mode(&self) -> Option<PruneMode> {
41 self.static_file_provider
42 .get_highest_static_file_block(StaticFileSegment::Transactions)
43 .map(PruneMode::before_inclusive)
44 }
45
46 fn purpose(&self) -> PrunePurpose {
47 PrunePurpose::StaticFile
48 }
49
50 fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
51 let tx_range = match input.get_next_tx_num_range(provider)? {
52 Some(range) => range,
53 None => {
54 trace!(target: "pruner", "No transactions to prune");
55 return Ok(SegmentOutput::done())
56 }
57 };
58
59 let mut limiter = input.limiter;
60
61 let mut last_pruned_transaction = *tx_range.end();
62 let (pruned, done) = provider.tx_ref().prune_table_with_range::<tables::Transactions<
63 <Provider::Primitives as NodePrimitives>::SignedTx,
64 >>(
65 tx_range,
66 &mut limiter,
67 |_| false,
68 |row| last_pruned_transaction = row.0,
69 )?;
70 trace!(target: "pruner", %pruned, %done, "Pruned transactions");
71
72 let last_pruned_block = provider
73 .transaction_block(last_pruned_transaction)?
74 .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
75 .checked_sub(if done { 0 } else { 1 });
78
79 let progress = limiter.progress(done);
80
81 Ok(SegmentOutput {
82 progress,
83 pruned,
84 checkpoint: Some(SegmentOutputCheckpoint {
85 block_number: last_pruned_block,
86 tx_number: Some(last_pruned_transaction),
87 }),
88 })
89 }
90}
91
92#[cfg(test)]
93mod tests {
94 use crate::segments::{PruneInput, PruneLimiter, Segment};
95 use alloy_primitives::{BlockNumber, TxNumber, B256};
96 use assert_matches::assert_matches;
97 use itertools::{
98 FoldWhile::{Continue, Done},
99 Itertools,
100 };
101 use reth_db_api::tables;
102 use reth_provider::{
103 DatabaseProviderFactory, PruneCheckpointReader, PruneCheckpointWriter,
104 StaticFileProviderFactory,
105 };
106 use reth_prune_types::{
107 PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
108 SegmentOutput,
109 };
110 use reth_stages::test_utils::{StorageKind, TestStageDB};
111 use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
112 use std::ops::Sub;
113
114 #[test]
115 fn prune() {
116 let db = TestStageDB::default();
117 let mut rng = generators::rng();
118
119 let blocks = random_block_range(
120 &mut rng,
121 1..=100,
122 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
123 );
124 db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
125
126 let transactions =
127 blocks.iter().flat_map(|block| &block.body().transactions).collect::<Vec<_>>();
128
129 assert_eq!(db.table::<tables::Transactions>().unwrap().len(), transactions.len());
130
131 let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| {
132 let segment = super::Transactions::new(db.factory.static_file_provider());
133 let prune_mode = PruneMode::Before(to_block);
134 let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10);
135 let input = PruneInput {
136 previous_checkpoint: db
137 .factory
138 .provider()
139 .unwrap()
140 .get_prune_checkpoint(PruneSegment::Transactions)
141 .unwrap(),
142 to_block,
143 limiter: limiter.clone(),
144 };
145
146 let next_tx_number_to_prune = db
147 .factory
148 .provider()
149 .unwrap()
150 .get_prune_checkpoint(PruneSegment::Transactions)
151 .unwrap()
152 .and_then(|checkpoint| checkpoint.tx_number)
153 .map(|tx_number| tx_number + 1)
154 .unwrap_or_default();
155
156 let provider = db.factory.database_provider_rw().unwrap();
157 let result = segment.prune(&provider, input.clone()).unwrap();
158 limiter.increment_deleted_entries_count_by(result.pruned);
159
160 assert_matches!(
161 result,
162 SegmentOutput {progress, pruned, checkpoint: Some(_)}
163 if (progress, pruned) == expected_result
164 );
165
166 provider
167 .save_prune_checkpoint(
168 PruneSegment::Transactions,
169 result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
170 )
171 .unwrap();
172 provider.commit().expect("commit");
173
174 let last_pruned_tx_number = blocks
175 .iter()
176 .take(to_block as usize)
177 .map(|block| block.transaction_count())
178 .sum::<usize>()
179 .min(
180 next_tx_number_to_prune as usize +
181 input.limiter.deleted_entries_limit().unwrap(),
182 )
183 .sub(1);
184
185 let last_pruned_block_number = blocks
186 .iter()
187 .fold_while((0, 0), |(_, mut tx_count), block| {
188 tx_count += block.transaction_count();
189
190 if tx_count > last_pruned_tx_number {
191 Done((block.number, tx_count))
192 } else {
193 Continue((block.number, tx_count))
194 }
195 })
196 .into_inner()
197 .0
198 .checked_sub(if result.progress.is_finished() { 0 } else { 1 });
199
200 assert_eq!(
201 db.table::<tables::Transactions>().unwrap().len(),
202 transactions.len() - (last_pruned_tx_number + 1)
203 );
204 assert_eq!(
205 db.factory
206 .provider()
207 .unwrap()
208 .get_prune_checkpoint(PruneSegment::Transactions)
209 .unwrap(),
210 Some(PruneCheckpoint {
211 block_number: last_pruned_block_number,
212 tx_number: Some(last_pruned_tx_number as TxNumber),
213 prune_mode
214 })
215 );
216 };
217
218 test_prune(
219 6,
220 (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 10),
221 );
222 test_prune(6, (PruneProgress::Finished, 2));
223 }
224}