reth_prune/segments/user/
transaction_lookup.rs1use crate::{
2 db_ext::DbTxPruneExt,
3 segments::{PruneInput, Segment, SegmentOutput},
4 PrunerError,
5};
6use alloy_eips::eip2718::Encodable2718;
7use rayon::prelude::*;
8use reth_db_api::{tables, transaction::DbTxMut};
9use reth_provider::{BlockReader, DBProvider, PruneCheckpointReader, StaticFileProviderFactory};
10use reth_prune_types::{
11 PruneCheckpoint, PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutputCheckpoint,
12};
13use reth_static_file_types::StaticFileSegment;
14use tracing::{debug, instrument, trace};
15
16#[derive(Debug)]
17pub struct TransactionLookup {
18 mode: PruneMode,
19}
20
21impl TransactionLookup {
22 pub const fn new(mode: PruneMode) -> Self {
23 Self { mode }
24 }
25}
26
27impl<Provider> Segment<Provider> for TransactionLookup
28where
29 Provider: DBProvider<Tx: DbTxMut>
30 + BlockReader<Transaction: Encodable2718>
31 + PruneCheckpointReader
32 + StaticFileProviderFactory,
33{
34 fn segment(&self) -> PruneSegment {
35 PruneSegment::TransactionLookup
36 }
37
38 fn mode(&self) -> Option<PruneMode> {
39 Some(self.mode)
40 }
41
42 fn purpose(&self) -> PrunePurpose {
43 PrunePurpose::User
44 }
45
46 #[instrument(target = "pruner", skip(self, provider), ret(level = "trace"))]
47 fn prune(
48 &self,
49 provider: &Provider,
50 mut input: PruneInput,
51 ) -> Result<SegmentOutput, PrunerError> {
52 if let Some(lowest_range) =
57 provider.static_file_provider().get_lowest_range(StaticFileSegment::Transactions) &&
58 input
59 .previous_checkpoint
60 .is_none_or(|checkpoint| checkpoint.block_number < Some(lowest_range.start()))
61 {
62 let new_checkpoint = lowest_range.start().saturating_sub(1);
63 if let Some(body_indices) = provider.block_body_indices(new_checkpoint)? {
64 input.previous_checkpoint = Some(PruneCheckpoint {
65 block_number: Some(new_checkpoint),
66 tx_number: Some(body_indices.last_tx_num()),
67 prune_mode: self.mode,
68 });
69 debug!(
70 target: "pruner",
71 static_file_checkpoint = ?input.previous_checkpoint,
72 "Using static file transaction checkpoint as TransactionLookup starting point"
73 );
74 }
75 }
76
77 let (start, end) = match input.get_next_tx_num_range(provider)? {
78 Some(range) => range,
79 None => {
80 trace!(target: "pruner", "No transaction lookup entries to prune");
81 return Ok(SegmentOutput::done())
82 }
83 }
84 .into_inner();
85
86 if self.mode.is_full() {
88 let pruned = provider.tx_ref().clear_table::<tables::TransactionHashNumbers>()?;
89 trace!(target: "pruner", %pruned, "Cleared transaction lookup table");
90
91 let last_pruned_block = provider
92 .block_by_transaction_id(end)?
93 .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?;
94
95 return Ok(SegmentOutput {
96 progress: PruneProgress::Finished,
97 pruned,
98 checkpoint: Some(SegmentOutputCheckpoint {
99 block_number: Some(last_pruned_block),
100 tx_number: Some(end),
101 }),
102 });
103 }
104
105 let tx_range = start..=
106 Some(end)
107 .min(
108 input
109 .limiter
110 .deleted_entries_limit_left()
111 .map(|left| start.saturating_add(left as u64) - 1),
114 )
115 .unwrap();
116 let tx_range_end = *tx_range.end();
117
118 let mut hashes = provider
120 .transactions_by_tx_range(tx_range.clone())?
121 .into_par_iter()
122 .map(|transaction| transaction.trie_hash())
123 .collect::<Vec<_>>();
124
125 hashes.sort_unstable();
129
130 let tx_count = tx_range.count();
132 if hashes.len() != tx_count {
133 return Err(PrunerError::InconsistentData(
134 "Unexpected number of transaction hashes retrieved by transaction number range",
135 ))
136 }
137
138 let mut limiter = input.limiter;
139
140 let mut last_pruned_transaction = None;
141 let (pruned, done) =
142 provider.tx_ref().prune_table_with_iterator::<tables::TransactionHashNumbers>(
143 hashes,
144 &mut limiter,
145 |row| {
146 last_pruned_transaction =
147 Some(last_pruned_transaction.unwrap_or(row.1).max(row.1))
148 },
149 )?;
150
151 let done = done && tx_range_end == end;
152 trace!(target: "pruner", %pruned, %done, "Pruned transaction lookup");
153
154 let last_pruned_transaction = last_pruned_transaction.unwrap_or(tx_range_end);
155
156 let last_pruned_block = provider
157 .block_by_transaction_id(last_pruned_transaction)?
158 .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
159 .checked_sub(if done { 0 } else { 1 });
163
164 let progress = limiter.progress(done);
165
166 Ok(SegmentOutput {
167 progress,
168 pruned,
169 checkpoint: Some(SegmentOutputCheckpoint {
170 block_number: last_pruned_block,
171 tx_number: Some(last_pruned_transaction),
172 }),
173 })
174 }
175}
176
177#[cfg(test)]
178mod tests {
179 use crate::segments::{PruneInput, PruneLimiter, Segment, SegmentOutput, TransactionLookup};
180 use alloy_primitives::{BlockNumber, TxNumber, B256};
181 use assert_matches::assert_matches;
182 use itertools::{
183 FoldWhile::{Continue, Done},
184 Itertools,
185 };
186 use reth_db_api::tables;
187 use reth_provider::{DBProvider, DatabaseProviderFactory, PruneCheckpointReader};
188 use reth_prune_types::{
189 PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
190 };
191 use reth_stages::test_utils::{StorageKind, TestStageDB};
192 use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
193 use std::ops::Sub;
194
195 #[test]
196 fn prune() {
197 let db = TestStageDB::default();
198 let mut rng = generators::rng();
199
200 let blocks = random_block_range(
201 &mut rng,
202 1..=10,
203 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
204 );
205 db.insert_blocks(blocks.iter(), StorageKind::Static).expect("insert blocks");
206
207 let mut tx_hash_numbers = Vec::new();
208 for block in &blocks {
209 tx_hash_numbers.reserve_exact(block.transaction_count());
210 for transaction in &block.body().transactions {
211 tx_hash_numbers.push((*transaction.tx_hash(), tx_hash_numbers.len() as u64));
212 }
213 }
214 let tx_hash_numbers_len = tx_hash_numbers.len();
215 db.insert_tx_hash_numbers(tx_hash_numbers).expect("insert tx hash numbers");
216
217 assert_eq!(
218 db.count_entries::<tables::Transactions>().unwrap(),
219 blocks.iter().map(|block| block.transaction_count()).sum::<usize>()
220 );
221 assert_eq!(
222 db.count_entries::<tables::Transactions>().unwrap(),
223 db.table::<tables::TransactionHashNumbers>().unwrap().len()
224 );
225
226 let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| {
227 let prune_mode = PruneMode::Before(to_block);
228 let segment = TransactionLookup::new(prune_mode);
229 let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10);
230 let input = PruneInput {
231 previous_checkpoint: db
232 .factory
233 .provider()
234 .unwrap()
235 .get_prune_checkpoint(PruneSegment::TransactionLookup)
236 .unwrap(),
237 to_block,
238 limiter: limiter.clone(),
239 };
240
241 let next_tx_number_to_prune = db
242 .factory
243 .provider()
244 .unwrap()
245 .get_prune_checkpoint(PruneSegment::TransactionLookup)
246 .unwrap()
247 .and_then(|checkpoint| checkpoint.tx_number)
248 .map(|tx_number| tx_number + 1)
249 .unwrap_or_default();
250
251 let last_pruned_tx_number = blocks
252 .iter()
253 .take(to_block as usize)
254 .map(|block| block.transaction_count())
255 .sum::<usize>()
256 .min(
257 next_tx_number_to_prune as usize +
258 input.limiter.deleted_entries_limit().unwrap(),
259 )
260 .sub(1);
261
262 let last_pruned_block_number = blocks
263 .iter()
264 .fold_while((0, 0), |(_, mut tx_count), block| {
265 tx_count += block.transaction_count();
266
267 if tx_count > last_pruned_tx_number {
268 Done((block.number, tx_count))
269 } else {
270 Continue((block.number, tx_count))
271 }
272 })
273 .into_inner()
274 .0;
275
276 let provider = db.factory.database_provider_rw().unwrap();
277 let result = segment.prune(&provider, input).unwrap();
278 limiter.increment_deleted_entries_count_by(result.pruned);
279
280 assert_matches!(
281 result,
282 SegmentOutput {progress, pruned, checkpoint: Some(_)}
283 if (progress, pruned) == expected_result
284 );
285
286 segment
287 .save_checkpoint(
288 &provider,
289 result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
290 )
291 .unwrap();
292 provider.commit().expect("commit");
293
294 let last_pruned_block_number = last_pruned_block_number
295 .checked_sub(if result.progress.is_finished() { 0 } else { 1 });
296
297 assert_eq!(
298 db.table::<tables::TransactionHashNumbers>().unwrap().len(),
299 tx_hash_numbers_len - (last_pruned_tx_number + 1)
300 );
301 assert_eq!(
302 db.factory
303 .provider()
304 .unwrap()
305 .get_prune_checkpoint(PruneSegment::TransactionLookup)
306 .unwrap(),
307 Some(PruneCheckpoint {
308 block_number: last_pruned_block_number,
309 tx_number: Some(last_pruned_tx_number as TxNumber),
310 prune_mode
311 })
312 );
313 };
314
315 test_prune(
316 6,
317 (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 10),
318 );
319 test_prune(6, (PruneProgress::Finished, 2));
320 test_prune(10, (PruneProgress::Finished, 8));
321 }
322}