reth_prune/segments/user/
transaction_lookup.rs
1use crate::{
2 db_ext::DbTxPruneExt,
3 segments::{PruneInput, Segment, SegmentOutput},
4 PrunerError,
5};
6use alloy_eips::eip2718::Encodable2718;
7use rayon::prelude::*;
8use reth_db_api::{tables, transaction::DbTxMut};
9use reth_provider::{BlockReader, DBProvider};
10use reth_prune_types::{PruneMode, PrunePurpose, PruneSegment, SegmentOutputCheckpoint};
11use tracing::{instrument, trace};
12
13#[derive(Debug)]
14pub struct TransactionLookup {
15 mode: PruneMode,
16}
17
18impl TransactionLookup {
19 pub const fn new(mode: PruneMode) -> Self {
20 Self { mode }
21 }
22}
23
24impl<Provider> Segment<Provider> for TransactionLookup
25where
26 Provider: DBProvider<Tx: DbTxMut> + BlockReader<Transaction: Encodable2718>,
27{
28 fn segment(&self) -> PruneSegment {
29 PruneSegment::TransactionLookup
30 }
31
32 fn mode(&self) -> Option<PruneMode> {
33 Some(self.mode)
34 }
35
36 fn purpose(&self) -> PrunePurpose {
37 PrunePurpose::User
38 }
39
40 #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
41 fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
42 let (start, end) = match input.get_next_tx_num_range(provider)? {
43 Some(range) => range,
44 None => {
45 trace!(target: "pruner", "No transaction lookup entries to prune");
46 return Ok(SegmentOutput::done())
47 }
48 }
49 .into_inner();
50 let tx_range = start..=
51 Some(end)
52 .min(input.limiter.deleted_entries_limit_left().map(|left| start + left as u64 - 1))
53 .unwrap();
54 let tx_range_end = *tx_range.end();
55
56 let hashes = provider
58 .transactions_by_tx_range(tx_range.clone())?
59 .into_par_iter()
60 .map(|transaction| transaction.trie_hash())
61 .collect::<Vec<_>>();
62
63 let tx_count = tx_range.count();
65 if hashes.len() != tx_count {
66 return Err(PrunerError::InconsistentData(
67 "Unexpected number of transaction hashes retrieved by transaction number range",
68 ))
69 }
70
71 let mut limiter = input.limiter;
72
73 let mut last_pruned_transaction = None;
74 let (pruned, done) =
75 provider.tx_ref().prune_table_with_iterator::<tables::TransactionHashNumbers>(
76 hashes,
77 &mut limiter,
78 |row| {
79 last_pruned_transaction =
80 Some(last_pruned_transaction.unwrap_or(row.1).max(row.1))
81 },
82 )?;
83
84 let done = done && tx_range_end == end;
85 trace!(target: "pruner", %pruned, %done, "Pruned transaction lookup");
86
87 let last_pruned_transaction = last_pruned_transaction.unwrap_or(tx_range_end);
88
89 let last_pruned_block = provider
90 .transaction_block(last_pruned_transaction)?
91 .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
92 .checked_sub(if done { 0 } else { 1 });
96
97 let progress = limiter.progress(done);
98
99 Ok(SegmentOutput {
100 progress,
101 pruned,
102 checkpoint: Some(SegmentOutputCheckpoint {
103 block_number: last_pruned_block,
104 tx_number: Some(last_pruned_transaction),
105 }),
106 })
107 }
108}
109
110#[cfg(test)]
111mod tests {
112 use crate::segments::{PruneInput, PruneLimiter, Segment, SegmentOutput, TransactionLookup};
113 use alloy_primitives::{BlockNumber, TxNumber, B256};
114 use assert_matches::assert_matches;
115 use itertools::{
116 FoldWhile::{Continue, Done},
117 Itertools,
118 };
119 use reth_db_api::tables;
120 use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader};
121 use reth_prune_types::{
122 PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
123 };
124 use reth_stages::test_utils::{StorageKind, TestStageDB};
125 use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
126 use std::ops::Sub;
127
128 #[test]
129 fn prune() {
130 let db = TestStageDB::default();
131 let mut rng = generators::rng();
132
133 let blocks = random_block_range(
134 &mut rng,
135 1..=10,
136 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
137 );
138 db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
139
140 let mut tx_hash_numbers = Vec::new();
141 for block in &blocks {
142 tx_hash_numbers.reserve_exact(block.transaction_count());
143 for transaction in &block.body().transactions {
144 tx_hash_numbers.push((*transaction.tx_hash(), tx_hash_numbers.len() as u64));
145 }
146 }
147 let tx_hash_numbers_len = tx_hash_numbers.len();
148 db.insert_tx_hash_numbers(tx_hash_numbers).expect("insert tx hash numbers");
149
150 assert_eq!(
151 db.table::<tables::Transactions>().unwrap().len(),
152 blocks.iter().map(|block| block.transaction_count()).sum::<usize>()
153 );
154 assert_eq!(
155 db.table::<tables::Transactions>().unwrap().len(),
156 db.table::<tables::TransactionHashNumbers>().unwrap().len()
157 );
158
159 let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| {
160 let prune_mode = PruneMode::Before(to_block);
161 let segment = TransactionLookup::new(prune_mode);
162 let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10);
163 let input = PruneInput {
164 previous_checkpoint: db
165 .factory
166 .provider()
167 .unwrap()
168 .get_prune_checkpoint(PruneSegment::TransactionLookup)
169 .unwrap(),
170 to_block,
171 limiter: limiter.clone(),
172 };
173
174 let next_tx_number_to_prune = db
175 .factory
176 .provider()
177 .unwrap()
178 .get_prune_checkpoint(PruneSegment::TransactionLookup)
179 .unwrap()
180 .and_then(|checkpoint| checkpoint.tx_number)
181 .map(|tx_number| tx_number + 1)
182 .unwrap_or_default();
183
184 let last_pruned_tx_number = blocks
185 .iter()
186 .take(to_block as usize)
187 .map(|block| block.transaction_count())
188 .sum::<usize>()
189 .min(
190 next_tx_number_to_prune as usize +
191 input.limiter.deleted_entries_limit().unwrap(),
192 )
193 .sub(1);
194
195 let last_pruned_block_number = blocks
196 .iter()
197 .fold_while((0, 0), |(_, mut tx_count), block| {
198 tx_count += block.transaction_count();
199
200 if tx_count > last_pruned_tx_number {
201 Done((block.number, tx_count))
202 } else {
203 Continue((block.number, tx_count))
204 }
205 })
206 .into_inner()
207 .0;
208
209 let provider = db.factory.database_provider_rw().unwrap();
210 let result = segment.prune(&provider, input).unwrap();
211 limiter.increment_deleted_entries_count_by(result.pruned);
212
213 assert_matches!(
214 result,
215 SegmentOutput {progress, pruned, checkpoint: Some(_)}
216 if (progress, pruned) == expected_result
217 );
218
219 segment
220 .save_checkpoint(
221 &provider,
222 result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
223 )
224 .unwrap();
225 provider.commit().expect("commit");
226
227 let last_pruned_block_number = last_pruned_block_number
228 .checked_sub(if result.progress.is_finished() { 0 } else { 1 });
229
230 assert_eq!(
231 db.table::<tables::TransactionHashNumbers>().unwrap().len(),
232 tx_hash_numbers_len - (last_pruned_tx_number + 1)
233 );
234 assert_eq!(
235 db.factory
236 .provider()
237 .unwrap()
238 .get_prune_checkpoint(PruneSegment::TransactionLookup)
239 .unwrap(),
240 Some(PruneCheckpoint {
241 block_number: last_pruned_block_number,
242 tx_number: Some(last_pruned_tx_number as TxNumber),
243 prune_mode
244 })
245 );
246 };
247
248 test_prune(
249 6,
250 (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 10),
251 );
252 test_prune(6, (PruneProgress::Finished, 2));
253 test_prune(10, (PruneProgress::Finished, 8));
254 }
255}