reth_prune/segments/user/
transaction_lookup.rs1use crate::{
2 db_ext::DbTxPruneExt,
3 segments::{PruneInput, Segment, SegmentOutput},
4 PrunerError,
5};
6use alloy_eips::eip2718::Encodable2718;
7use rayon::prelude::*;
8use reth_db_api::{tables, transaction::DbTxMut};
9use reth_provider::{BlockReader, DBProvider, PruneCheckpointReader, StaticFileProviderFactory};
10use reth_prune_types::{
11 PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment, SegmentOutputCheckpoint,
12};
13use reth_static_file_types::StaticFileSegment;
14use tracing::{debug, instrument, trace};
15
16#[derive(Debug)]
17pub struct TransactionLookup {
18 mode: PruneMode,
19}
20
21impl TransactionLookup {
22 pub const fn new(mode: PruneMode) -> Self {
23 Self { mode }
24 }
25}
26
27impl<Provider> Segment<Provider> for TransactionLookup
28where
29 Provider: DBProvider<Tx: DbTxMut>
30 + BlockReader<Transaction: Encodable2718>
31 + PruneCheckpointReader
32 + StaticFileProviderFactory,
33{
34 fn segment(&self) -> PruneSegment {
35 PruneSegment::TransactionLookup
36 }
37
38 fn mode(&self) -> Option<PruneMode> {
39 Some(self.mode)
40 }
41
42 fn purpose(&self) -> PrunePurpose {
43 PrunePurpose::User
44 }
45
46 #[instrument(target = "pruner", skip(self, provider), ret(level = "trace"))]
47 fn prune(
48 &self,
49 provider: &Provider,
50 mut input: PruneInput,
51 ) -> Result<SegmentOutput, PrunerError> {
52 if let Some(lowest_range) =
57 provider.static_file_provider().get_lowest_range(StaticFileSegment::Transactions) &&
58 input
59 .previous_checkpoint
60 .is_none_or(|checkpoint| checkpoint.block_number < Some(lowest_range.start()))
61 {
62 let new_checkpoint = lowest_range.start().saturating_sub(1);
63 if let Some(body_indices) = provider.block_body_indices(new_checkpoint)? {
64 input.previous_checkpoint = Some(PruneCheckpoint {
65 block_number: Some(new_checkpoint),
66 tx_number: Some(body_indices.last_tx_num()),
67 prune_mode: self.mode,
68 });
69 debug!(
70 target: "pruner",
71 static_file_checkpoint = ?input.previous_checkpoint,
72 "Using static file transaction checkpoint as TransactionLookup starting point"
73 );
74 }
75 }
76
77 let (start, end) = match input.get_next_tx_num_range(provider)? {
78 Some(range) => range,
79 None => {
80 trace!(target: "pruner", "No transaction lookup entries to prune");
81 return Ok(SegmentOutput::done())
82 }
83 }
84 .into_inner();
85 let tx_range = start..=
86 Some(end)
87 .min(input.limiter.deleted_entries_limit_left().map(|left| start + left as u64 - 1))
88 .unwrap();
89 let tx_range_end = *tx_range.end();
90
91 let hashes = provider
93 .transactions_by_tx_range(tx_range.clone())?
94 .into_par_iter()
95 .map(|transaction| transaction.trie_hash())
96 .collect::<Vec<_>>();
97
98 let tx_count = tx_range.count();
100 if hashes.len() != tx_count {
101 return Err(PrunerError::InconsistentData(
102 "Unexpected number of transaction hashes retrieved by transaction number range",
103 ))
104 }
105
106 let mut limiter = input.limiter;
107
108 let mut last_pruned_transaction = None;
109 let (pruned, done) =
110 provider.tx_ref().prune_table_with_iterator::<tables::TransactionHashNumbers>(
111 hashes,
112 &mut limiter,
113 |row| {
114 last_pruned_transaction =
115 Some(last_pruned_transaction.unwrap_or(row.1).max(row.1))
116 },
117 )?;
118
119 let done = done && tx_range_end == end;
120 trace!(target: "pruner", %pruned, %done, "Pruned transaction lookup");
121
122 let last_pruned_transaction = last_pruned_transaction.unwrap_or(tx_range_end);
123
124 let last_pruned_block = provider
125 .transaction_block(last_pruned_transaction)?
126 .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
127 .checked_sub(if done { 0 } else { 1 });
131
132 let progress = limiter.progress(done);
133
134 Ok(SegmentOutput {
135 progress,
136 pruned,
137 checkpoint: Some(SegmentOutputCheckpoint {
138 block_number: last_pruned_block,
139 tx_number: Some(last_pruned_transaction),
140 }),
141 })
142 }
143}
144
145#[cfg(test)]
146mod tests {
147 use crate::segments::{PruneInput, PruneLimiter, Segment, SegmentOutput, TransactionLookup};
148 use alloy_primitives::{BlockNumber, TxNumber, B256};
149 use assert_matches::assert_matches;
150 use itertools::{
151 FoldWhile::{Continue, Done},
152 Itertools,
153 };
154 use reth_db_api::tables;
155 use reth_provider::{DBProvider, DatabaseProviderFactory, PruneCheckpointReader};
156 use reth_prune_types::{
157 PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
158 };
159 use reth_stages::test_utils::{StorageKind, TestStageDB};
160 use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
161 use std::ops::Sub;
162
163 #[test]
164 fn prune() {
165 let db = TestStageDB::default();
166 let mut rng = generators::rng();
167
168 let blocks = random_block_range(
169 &mut rng,
170 1..=10,
171 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
172 );
173 db.insert_blocks(blocks.iter(), StorageKind::Static).expect("insert blocks");
174
175 let mut tx_hash_numbers = Vec::new();
176 for block in &blocks {
177 tx_hash_numbers.reserve_exact(block.transaction_count());
178 for transaction in &block.body().transactions {
179 tx_hash_numbers.push((*transaction.tx_hash(), tx_hash_numbers.len() as u64));
180 }
181 }
182 let tx_hash_numbers_len = tx_hash_numbers.len();
183 db.insert_tx_hash_numbers(tx_hash_numbers).expect("insert tx hash numbers");
184
185 assert_eq!(
186 db.count_entries::<tables::Transactions>().unwrap(),
187 blocks.iter().map(|block| block.transaction_count()).sum::<usize>()
188 );
189 assert_eq!(
190 db.count_entries::<tables::Transactions>().unwrap(),
191 db.table::<tables::TransactionHashNumbers>().unwrap().len()
192 );
193
194 let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| {
195 let prune_mode = PruneMode::Before(to_block);
196 let segment = TransactionLookup::new(prune_mode);
197 let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10);
198 let input = PruneInput {
199 previous_checkpoint: db
200 .factory
201 .provider()
202 .unwrap()
203 .get_prune_checkpoint(PruneSegment::TransactionLookup)
204 .unwrap(),
205 to_block,
206 limiter: limiter.clone(),
207 };
208
209 let next_tx_number_to_prune = db
210 .factory
211 .provider()
212 .unwrap()
213 .get_prune_checkpoint(PruneSegment::TransactionLookup)
214 .unwrap()
215 .and_then(|checkpoint| checkpoint.tx_number)
216 .map(|tx_number| tx_number + 1)
217 .unwrap_or_default();
218
219 let last_pruned_tx_number = blocks
220 .iter()
221 .take(to_block as usize)
222 .map(|block| block.transaction_count())
223 .sum::<usize>()
224 .min(
225 next_tx_number_to_prune as usize +
226 input.limiter.deleted_entries_limit().unwrap(),
227 )
228 .sub(1);
229
230 let last_pruned_block_number = blocks
231 .iter()
232 .fold_while((0, 0), |(_, mut tx_count), block| {
233 tx_count += block.transaction_count();
234
235 if tx_count > last_pruned_tx_number {
236 Done((block.number, tx_count))
237 } else {
238 Continue((block.number, tx_count))
239 }
240 })
241 .into_inner()
242 .0;
243
244 let provider = db.factory.database_provider_rw().unwrap();
245 let result = segment.prune(&provider, input).unwrap();
246 limiter.increment_deleted_entries_count_by(result.pruned);
247
248 assert_matches!(
249 result,
250 SegmentOutput {progress, pruned, checkpoint: Some(_)}
251 if (progress, pruned) == expected_result
252 );
253
254 segment
255 .save_checkpoint(
256 &provider,
257 result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
258 )
259 .unwrap();
260 provider.commit().expect("commit");
261
262 let last_pruned_block_number = last_pruned_block_number
263 .checked_sub(if result.progress.is_finished() { 0 } else { 1 });
264
265 assert_eq!(
266 db.table::<tables::TransactionHashNumbers>().unwrap().len(),
267 tx_hash_numbers_len - (last_pruned_tx_number + 1)
268 );
269 assert_eq!(
270 db.factory
271 .provider()
272 .unwrap()
273 .get_prune_checkpoint(PruneSegment::TransactionLookup)
274 .unwrap(),
275 Some(PruneCheckpoint {
276 block_number: last_pruned_block_number,
277 tx_number: Some(last_pruned_tx_number as TxNumber),
278 prune_mode
279 })
280 );
281 };
282
283 test_prune(
284 6,
285 (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 10),
286 );
287 test_prune(6, (PruneProgress::Finished, 2));
288 test_prune(10, (PruneProgress::Finished, 8));
289 }
290}