reth_prune/segments/user/
transaction_lookup.rs1use crate::{
2 db_ext::DbTxPruneExt,
3 segments::{PruneInput, Segment, SegmentOutput},
4 PrunerError,
5};
6use alloy_eips::eip2718::Encodable2718;
7use rayon::prelude::*;
8use reth_db_api::{tables, transaction::DbTxMut};
9use reth_provider::{BlockReader, DBProvider, PruneCheckpointReader};
10use reth_prune_types::{PruneMode, PrunePurpose, PruneSegment, SegmentOutputCheckpoint};
11use tracing::{debug, instrument, trace};
12
13#[derive(Debug)]
14pub struct TransactionLookup {
15 mode: PruneMode,
16}
17
18impl TransactionLookup {
19 pub const fn new(mode: PruneMode) -> Self {
20 Self { mode }
21 }
22}
23
24impl<Provider> Segment<Provider> for TransactionLookup
25where
26 Provider:
27 DBProvider<Tx: DbTxMut> + BlockReader<Transaction: Encodable2718> + PruneCheckpointReader,
28{
29 fn segment(&self) -> PruneSegment {
30 PruneSegment::TransactionLookup
31 }
32
33 fn mode(&self) -> Option<PruneMode> {
34 Some(self.mode)
35 }
36
37 fn purpose(&self) -> PrunePurpose {
38 PrunePurpose::User
39 }
40
41 #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
42 fn prune(
43 &self,
44 provider: &Provider,
45 mut input: PruneInput,
46 ) -> Result<SegmentOutput, PrunerError> {
47 if let Some(txs_checkpoint) = provider.get_prune_checkpoint(PruneSegment::Transactions)? &&
52 input
53 .previous_checkpoint
54 .is_none_or(|checkpoint| checkpoint.block_number < txs_checkpoint.block_number)
55 {
56 input.previous_checkpoint = Some(txs_checkpoint);
57 debug!(
58 target: "pruner",
59 transactions_checkpoint = ?input.previous_checkpoint,
60 "No TransactionLookup checkpoint found, using Transactions checkpoint as fallback"
61 );
62 }
63
64 let (start, end) = match input.get_next_tx_num_range(provider)? {
65 Some(range) => range,
66 None => {
67 trace!(target: "pruner", "No transaction lookup entries to prune");
68 return Ok(SegmentOutput::done())
69 }
70 }
71 .into_inner();
72 let tx_range = start..=
73 Some(end)
74 .min(input.limiter.deleted_entries_limit_left().map(|left| start + left as u64 - 1))
75 .unwrap();
76 let tx_range_end = *tx_range.end();
77
78 let hashes = provider
80 .transactions_by_tx_range(tx_range.clone())?
81 .into_par_iter()
82 .map(|transaction| transaction.trie_hash())
83 .collect::<Vec<_>>();
84
85 let tx_count = tx_range.count();
87 if hashes.len() != tx_count {
88 return Err(PrunerError::InconsistentData(
89 "Unexpected number of transaction hashes retrieved by transaction number range",
90 ))
91 }
92
93 let mut limiter = input.limiter;
94
95 let mut last_pruned_transaction = None;
96 let (pruned, done) =
97 provider.tx_ref().prune_table_with_iterator::<tables::TransactionHashNumbers>(
98 hashes,
99 &mut limiter,
100 |row| {
101 last_pruned_transaction =
102 Some(last_pruned_transaction.unwrap_or(row.1).max(row.1))
103 },
104 )?;
105
106 let done = done && tx_range_end == end;
107 trace!(target: "pruner", %pruned, %done, "Pruned transaction lookup");
108
109 let last_pruned_transaction = last_pruned_transaction.unwrap_or(tx_range_end);
110
111 let last_pruned_block = provider
112 .transaction_block(last_pruned_transaction)?
113 .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
114 .checked_sub(if done { 0 } else { 1 });
118
119 let progress = limiter.progress(done);
120
121 Ok(SegmentOutput {
122 progress,
123 pruned,
124 checkpoint: Some(SegmentOutputCheckpoint {
125 block_number: last_pruned_block,
126 tx_number: Some(last_pruned_transaction),
127 }),
128 })
129 }
130}
131
132#[cfg(test)]
133mod tests {
134 use crate::segments::{PruneInput, PruneLimiter, Segment, SegmentOutput, TransactionLookup};
135 use alloy_primitives::{BlockNumber, TxNumber, B256};
136 use assert_matches::assert_matches;
137 use itertools::{
138 FoldWhile::{Continue, Done},
139 Itertools,
140 };
141 use reth_db_api::tables;
142 use reth_provider::{DBProvider, DatabaseProviderFactory, PruneCheckpointReader};
143 use reth_prune_types::{
144 PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
145 };
146 use reth_stages::test_utils::{StorageKind, TestStageDB};
147 use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
148 use std::ops::Sub;
149
150 #[test]
151 fn prune() {
152 let db = TestStageDB::default();
153 let mut rng = generators::rng();
154
155 let blocks = random_block_range(
156 &mut rng,
157 1..=10,
158 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
159 );
160 db.insert_blocks(blocks.iter(), StorageKind::Static).expect("insert blocks");
161
162 let mut tx_hash_numbers = Vec::new();
163 for block in &blocks {
164 tx_hash_numbers.reserve_exact(block.transaction_count());
165 for transaction in &block.body().transactions {
166 tx_hash_numbers.push((*transaction.tx_hash(), tx_hash_numbers.len() as u64));
167 }
168 }
169 let tx_hash_numbers_len = tx_hash_numbers.len();
170 db.insert_tx_hash_numbers(tx_hash_numbers).expect("insert tx hash numbers");
171
172 assert_eq!(
173 db.count_entries::<tables::Transactions>().unwrap(),
174 blocks.iter().map(|block| block.transaction_count()).sum::<usize>()
175 );
176 assert_eq!(
177 db.count_entries::<tables::Transactions>().unwrap(),
178 db.table::<tables::TransactionHashNumbers>().unwrap().len()
179 );
180
181 let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| {
182 let prune_mode = PruneMode::Before(to_block);
183 let segment = TransactionLookup::new(prune_mode);
184 let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10);
185 let input = PruneInput {
186 previous_checkpoint: db
187 .factory
188 .provider()
189 .unwrap()
190 .get_prune_checkpoint(PruneSegment::TransactionLookup)
191 .unwrap(),
192 to_block,
193 limiter: limiter.clone(),
194 };
195
196 let next_tx_number_to_prune = db
197 .factory
198 .provider()
199 .unwrap()
200 .get_prune_checkpoint(PruneSegment::TransactionLookup)
201 .unwrap()
202 .and_then(|checkpoint| checkpoint.tx_number)
203 .map(|tx_number| tx_number + 1)
204 .unwrap_or_default();
205
206 let last_pruned_tx_number = blocks
207 .iter()
208 .take(to_block as usize)
209 .map(|block| block.transaction_count())
210 .sum::<usize>()
211 .min(
212 next_tx_number_to_prune as usize +
213 input.limiter.deleted_entries_limit().unwrap(),
214 )
215 .sub(1);
216
217 let last_pruned_block_number = blocks
218 .iter()
219 .fold_while((0, 0), |(_, mut tx_count), block| {
220 tx_count += block.transaction_count();
221
222 if tx_count > last_pruned_tx_number {
223 Done((block.number, tx_count))
224 } else {
225 Continue((block.number, tx_count))
226 }
227 })
228 .into_inner()
229 .0;
230
231 let provider = db.factory.database_provider_rw().unwrap();
232 let result = segment.prune(&provider, input).unwrap();
233 limiter.increment_deleted_entries_count_by(result.pruned);
234
235 assert_matches!(
236 result,
237 SegmentOutput {progress, pruned, checkpoint: Some(_)}
238 if (progress, pruned) == expected_result
239 );
240
241 segment
242 .save_checkpoint(
243 &provider,
244 result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
245 )
246 .unwrap();
247 provider.commit().expect("commit");
248
249 let last_pruned_block_number = last_pruned_block_number
250 .checked_sub(if result.progress.is_finished() { 0 } else { 1 });
251
252 assert_eq!(
253 db.table::<tables::TransactionHashNumbers>().unwrap().len(),
254 tx_hash_numbers_len - (last_pruned_tx_number + 1)
255 );
256 assert_eq!(
257 db.factory
258 .provider()
259 .unwrap()
260 .get_prune_checkpoint(PruneSegment::TransactionLookup)
261 .unwrap(),
262 Some(PruneCheckpoint {
263 block_number: last_pruned_block_number,
264 tx_number: Some(last_pruned_tx_number as TxNumber),
265 prune_mode
266 })
267 );
268 };
269
270 test_prune(
271 6,
272 (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 10),
273 );
274 test_prune(6, (PruneProgress::Finished, 2));
275 test_prune(10, (PruneProgress::Finished, 8));
276 }
277}