reth_prune/segments/user/
transaction_lookup.rs
1use crate::{
2 db_ext::DbTxPruneExt,
3 segments::{PruneInput, Segment, SegmentOutput},
4 PrunerError,
5};
6use alloy_eips::eip2718::Encodable2718;
7use rayon::prelude::*;
8use reth_db_api::{tables, transaction::DbTxMut};
9use reth_provider::{BlockReader, DBProvider};
10use reth_prune_types::{PruneMode, PrunePurpose, PruneSegment, SegmentOutputCheckpoint};
11use tracing::{instrument, trace};
12
13#[derive(Debug)]
14pub struct TransactionLookup {
15 mode: PruneMode,
16}
17
18impl TransactionLookup {
19 pub const fn new(mode: PruneMode) -> Self {
20 Self { mode }
21 }
22}
23
24impl<Provider> Segment<Provider> for TransactionLookup
25where
26 Provider: DBProvider<Tx: DbTxMut> + BlockReader<Transaction: Encodable2718>,
27{
28 fn segment(&self) -> PruneSegment {
29 PruneSegment::TransactionLookup
30 }
31
32 fn mode(&self) -> Option<PruneMode> {
33 Some(self.mode)
34 }
35
36 fn purpose(&self) -> PrunePurpose {
37 PrunePurpose::User
38 }
39
40 #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
41 fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
42 let (start, end) = match input.get_next_tx_num_range(provider)? {
43 Some(range) => range,
44 None => {
45 trace!(target: "pruner", "No transaction lookup entries to prune");
46 return Ok(SegmentOutput::done())
47 }
48 }
49 .into_inner();
50 let tx_range = start..=
51 Some(end)
52 .min(input.limiter.deleted_entries_limit_left().map(|left| start + left as u64 - 1))
53 .unwrap();
54 let tx_range_end = *tx_range.end();
55
56 let hashes = provider
58 .transactions_by_tx_range(tx_range.clone())?
59 .into_par_iter()
60 .map(|transaction| transaction.trie_hash())
61 .collect::<Vec<_>>();
62
63 let tx_count = tx_range.count();
65 if hashes.len() != tx_count {
66 return Err(PrunerError::InconsistentData(
67 "Unexpected number of transaction hashes retrieved by transaction number range",
68 ))
69 }
70
71 let mut limiter = input.limiter;
72
73 let mut last_pruned_transaction = None;
74 let (pruned, done) =
75 provider.tx_ref().prune_table_with_iterator::<tables::TransactionHashNumbers>(
76 hashes,
77 &mut limiter,
78 |row| {
79 last_pruned_transaction =
80 Some(last_pruned_transaction.unwrap_or(row.1).max(row.1))
81 },
82 )?;
83
84 let done = done && tx_range_end == end;
85 trace!(target: "pruner", %pruned, %done, "Pruned transaction lookup");
86
87 let last_pruned_transaction = last_pruned_transaction.unwrap_or(tx_range_end);
88
89 let last_pruned_block = provider
90 .transaction_block(last_pruned_transaction)?
91 .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
92 .checked_sub(if done { 0 } else { 1 });
96
97 let progress = limiter.progress(done);
98
99 Ok(SegmentOutput {
100 progress,
101 pruned,
102 checkpoint: Some(SegmentOutputCheckpoint {
103 block_number: last_pruned_block,
104 tx_number: Some(last_pruned_transaction),
105 }),
106 })
107 }
108}
109
110#[cfg(test)]
111mod tests {
112 use crate::segments::{PruneInput, PruneLimiter, Segment, SegmentOutput, TransactionLookup};
113 use alloy_primitives::{BlockNumber, TxNumber, B256};
114 use assert_matches::assert_matches;
115 use itertools::{
116 FoldWhile::{Continue, Done},
117 Itertools,
118 };
119 use reth_db_api::tables;
120 use reth_primitives_traits::SignedTransaction;
121 use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader};
122 use reth_prune_types::{
123 PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
124 };
125 use reth_stages::test_utils::{StorageKind, TestStageDB};
126 use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
127 use std::ops::Sub;
128
129 #[test]
130 fn prune() {
131 let db = TestStageDB::default();
132 let mut rng = generators::rng();
133
134 let blocks = random_block_range(
135 &mut rng,
136 1..=10,
137 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
138 );
139 db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
140
141 let mut tx_hash_numbers = Vec::new();
142 for block in &blocks {
143 tx_hash_numbers.reserve_exact(block.transaction_count());
144 for transaction in &block.body().transactions {
145 tx_hash_numbers.push((*transaction.tx_hash(), tx_hash_numbers.len() as u64));
146 }
147 }
148 let tx_hash_numbers_len = tx_hash_numbers.len();
149 db.insert_tx_hash_numbers(tx_hash_numbers).expect("insert tx hash numbers");
150
151 assert_eq!(
152 db.table::<tables::Transactions>().unwrap().len(),
153 blocks.iter().map(|block| block.transaction_count()).sum::<usize>()
154 );
155 assert_eq!(
156 db.table::<tables::Transactions>().unwrap().len(),
157 db.table::<tables::TransactionHashNumbers>().unwrap().len()
158 );
159
160 let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| {
161 let prune_mode = PruneMode::Before(to_block);
162 let segment = TransactionLookup::new(prune_mode);
163 let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10);
164 let input = PruneInput {
165 previous_checkpoint: db
166 .factory
167 .provider()
168 .unwrap()
169 .get_prune_checkpoint(PruneSegment::TransactionLookup)
170 .unwrap(),
171 to_block,
172 limiter: limiter.clone(),
173 };
174
175 let next_tx_number_to_prune = db
176 .factory
177 .provider()
178 .unwrap()
179 .get_prune_checkpoint(PruneSegment::TransactionLookup)
180 .unwrap()
181 .and_then(|checkpoint| checkpoint.tx_number)
182 .map(|tx_number| tx_number + 1)
183 .unwrap_or_default();
184
185 let last_pruned_tx_number = blocks
186 .iter()
187 .take(to_block as usize)
188 .map(|block| block.transaction_count())
189 .sum::<usize>()
190 .min(
191 next_tx_number_to_prune as usize +
192 input.limiter.deleted_entries_limit().unwrap(),
193 )
194 .sub(1);
195
196 let last_pruned_block_number = blocks
197 .iter()
198 .fold_while((0, 0), |(_, mut tx_count), block| {
199 tx_count += block.transaction_count();
200
201 if tx_count > last_pruned_tx_number {
202 Done((block.number, tx_count))
203 } else {
204 Continue((block.number, tx_count))
205 }
206 })
207 .into_inner()
208 .0;
209
210 let provider = db.factory.database_provider_rw().unwrap();
211 let result = segment.prune(&provider, input).unwrap();
212 limiter.increment_deleted_entries_count_by(result.pruned);
213
214 assert_matches!(
215 result,
216 SegmentOutput {progress, pruned, checkpoint: Some(_)}
217 if (progress, pruned) == expected_result
218 );
219
220 segment
221 .save_checkpoint(
222 &provider,
223 result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
224 )
225 .unwrap();
226 provider.commit().expect("commit");
227
228 let last_pruned_block_number = last_pruned_block_number
229 .checked_sub(if result.progress.is_finished() { 0 } else { 1 });
230
231 assert_eq!(
232 db.table::<tables::TransactionHashNumbers>().unwrap().len(),
233 tx_hash_numbers_len - (last_pruned_tx_number + 1)
234 );
235 assert_eq!(
236 db.factory
237 .provider()
238 .unwrap()
239 .get_prune_checkpoint(PruneSegment::TransactionLookup)
240 .unwrap(),
241 Some(PruneCheckpoint {
242 block_number: last_pruned_block_number,
243 tx_number: Some(last_pruned_tx_number as TxNumber),
244 prune_mode
245 })
246 );
247 };
248
249 test_prune(
250 6,
251 (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 10),
252 );
253 test_prune(6, (PruneProgress::Finished, 2));
254 test_prune(10, (PruneProgress::Finished, 8));
255 }
256}