1use crate::{
2 segments::{self, PruneInput, Segment},
3 PrunerError,
4};
5use alloy_primitives::BlockNumber;
6use reth_provider::{BlockReader, PruneCheckpointReader, StaticFileProviderFactory};
7use reth_prune_types::{
8 PruneInterruptReason, PruneMode, PrunePurpose, PruneSegment, SegmentOutput,
9 SegmentOutputCheckpoint,
10};
11use reth_static_file_types::StaticFileSegment;
12use tracing::{debug, instrument};
13
14#[derive(Debug)]
18pub struct Bodies {
19 mode: PruneMode,
20 tx_lookup_mode: Option<PruneMode>,
23}
24
25impl Bodies {
26 pub const fn new(mode: PruneMode, tx_lookup_mode: Option<PruneMode>) -> Self {
29 Self { mode, tx_lookup_mode }
30 }
31
32 fn next_bodies_prune_target<Provider>(
37 &self,
38 provider: &Provider,
39 input: &PruneInput,
40 ) -> Result<Option<BlockNumber>, PrunerError>
41 where
42 Provider: PruneCheckpointReader,
43 {
44 let Some(tx_lookup_mode) = self.tx_lookup_mode else { return Ok(Some(input.to_block)) };
45
46 let tx_lookup_checkpoint = provider
47 .get_prune_checkpoint(PruneSegment::TransactionLookup)?
48 .and_then(|cp| cp.block_number);
49
50 let to_block = match tx_lookup_mode.next_pruned_block(tx_lookup_checkpoint) {
59 None => Some(input.to_block),
60 Some(tx_lookup_next) if tx_lookup_next > input.to_block => Some(input.to_block),
61 Some(tx_lookup_next) => {
62 let Some(safe) = tx_lookup_next.checked_sub(1) else {
67 return Ok(None);
68 };
69
70 if input.previous_checkpoint.is_some_and(|cp| cp.block_number.unwrap_or(0) >= safe)
71 {
72 return Ok(None)
74 }
75
76 debug!(
77 target: "pruner",
78 to_block = input.to_block,
79 safe,
80 "Bodies pruning limited by tx_lookup progress"
81 );
82 Some(safe)
83 }
84 };
85
86 Ok(to_block)
87 }
88}
89
90impl<Provider> Segment<Provider> for Bodies
91where
92 Provider: StaticFileProviderFactory + BlockReader + PruneCheckpointReader,
93{
94 fn segment(&self) -> PruneSegment {
95 PruneSegment::Bodies
96 }
97
98 fn mode(&self) -> Option<PruneMode> {
99 Some(self.mode)
100 }
101
102 fn purpose(&self) -> PrunePurpose {
103 PrunePurpose::User
104 }
105
106 #[instrument(
107 name = "Bodies::prune",
108 target = "pruner",
109 skip(self, provider),
110 ret(level = "trace")
111 )]
112 fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
113 let Some(to_block) = self.next_bodies_prune_target(provider, &input)? else {
114 debug!(
115 to_block = input.to_block,
116 "Transaction lookup still has work to be done up to target block"
117 );
118 return Ok(SegmentOutput::not_done(
119 PruneInterruptReason::WaitingOnSegment(PruneSegment::TransactionLookup),
120 input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
121 ));
122 };
123
124 let adjusted_input = PruneInput { to_block, ..input };
126 segments::prune_static_files(provider, adjusted_input, StaticFileSegment::Transactions)
127 }
128}
129
130#[cfg(test)]
131mod tests {
132 use super::*;
133 use crate::Pruner;
134 use alloy_primitives::BlockNumber;
135 use reth_exex_types::FinishedExExHeight;
136 use reth_provider::{
137 test_utils::{create_test_provider_factory, MockNodeTypesWithDB},
138 DBProvider, DatabaseProviderFactory, ProviderFactory, PruneCheckpointWriter,
139 StaticFileWriter,
140 };
141 use reth_prune_types::{PruneMode, PruneProgress, PruneSegment};
142 use reth_static_file_types::{
143 SegmentHeader, SegmentRangeInclusive, StaticFileSegment, DEFAULT_BLOCKS_PER_STATIC_FILE,
144 };
145
146 fn setup_static_file_jars<P: StaticFileProviderFactory>(provider: &P, tip_block: u64) {
150 let num_jars = (tip_block + 1) / DEFAULT_BLOCKS_PER_STATIC_FILE;
151 let txs_per_jar = 1000;
152 let static_file_provider = provider.static_file_provider();
153
154 let mut writer =
155 static_file_provider.latest_writer(StaticFileSegment::Transactions).unwrap();
156
157 for jar_idx in 0..num_jars {
158 let block_start = jar_idx * DEFAULT_BLOCKS_PER_STATIC_FILE;
159 let block_end = ((jar_idx + 1) * DEFAULT_BLOCKS_PER_STATIC_FILE - 1).min(tip_block);
160
161 let tx_start = jar_idx * txs_per_jar;
162 let tx_end = tx_start + txs_per_jar - 1;
163
164 *writer.user_header_mut() = SegmentHeader::new(
165 SegmentRangeInclusive::new(block_start, block_end),
166 Some(SegmentRangeInclusive::new(block_start, block_end)),
167 Some(SegmentRangeInclusive::new(tx_start, tx_end)),
168 StaticFileSegment::Transactions,
169 );
170
171 writer.inner().set_dirty();
172 writer.commit().expect("commit empty jar");
173
174 if jar_idx < num_jars - 1 {
175 writer.increment_block(block_end + 1).expect("increment block");
176 }
177 }
178
179 static_file_provider.initialize_index().expect("initialize index");
180 }
181
182 struct TestCase {
183 tx_lookup_mode: Option<PruneMode>,
184 tx_lookup_checkpoint_block: Option<BlockNumber>,
185 bodies_mode: PruneMode,
186 expected_pruned: usize,
187 expected_lowest_block: Option<BlockNumber>,
188 expected_progress: PruneProgress,
189 }
190
191 impl TestCase {
192 fn new() -> Self {
193 Self {
194 tx_lookup_mode: None,
195 tx_lookup_checkpoint_block: None,
196 bodies_mode: PruneMode::Full,
197 expected_pruned: 0,
198 expected_lowest_block: None,
199 expected_progress: PruneProgress::Finished,
200 }
201 }
202
203 fn with_bodies_mode(mut self, mode: PruneMode) -> Self {
204 self.bodies_mode = mode;
205 self
206 }
207
208 fn with_expected_pruned(mut self, pruned: usize) -> Self {
209 self.expected_pruned = pruned;
210 self
211 }
212
213 fn with_expected_progress(mut self, progress: PruneProgress) -> Self {
214 self.expected_progress = progress;
215 self
216 }
217
218 fn with_lowest_block(mut self, block: BlockNumber) -> Self {
219 self.expected_lowest_block = Some(block);
220 self
221 }
222
223 fn with_tx_lookup(mut self, mode: PruneMode, checkpoint: Option<BlockNumber>) -> Self {
224 self.tx_lookup_mode = Some(mode);
225 self.tx_lookup_checkpoint_block = checkpoint;
226 self
227 }
228 }
229
230 fn run_prune_test(
231 factory: &ProviderFactory<MockNodeTypesWithDB>,
232 test_case: TestCase,
233 tip: BlockNumber,
234 ) {
235 let (_, finished_exex_height_rx) = tokio::sync::watch::channel(FinishedExExHeight::NoExExs);
236
237 let static_provider = factory.static_file_provider();
239 let highest_before =
240 static_provider.get_highest_static_file_block(StaticFileSegment::Transactions);
241
242 if let Some(checkpoint_block) = test_case.tx_lookup_checkpoint_block {
244 let provider = factory.database_provider_rw().unwrap();
245 provider
246 .save_prune_checkpoint(
247 PruneSegment::TransactionLookup,
248 reth_prune_types::PruneCheckpoint {
249 block_number: Some(checkpoint_block),
250 tx_number: None,
251 prune_mode: test_case.tx_lookup_mode.unwrap(),
252 },
253 )
254 .unwrap();
255 provider.commit().unwrap();
256 }
257
258 let bodies = Bodies::new(test_case.bodies_mode, test_case.tx_lookup_mode);
259 let segments: Vec<Box<dyn Segment<_>>> = vec![Box::new(bodies)];
260
261 let mut pruner = Pruner::new_with_factory(
262 factory.clone(),
263 segments,
264 5,
265 10000,
266 None,
267 finished_exex_height_rx,
268 );
269
270 let result = pruner.run(tip).expect("pruner run");
271
272 assert_eq!(result.progress, test_case.expected_progress);
273 assert_eq!(result.segments.len(), 1);
274
275 let (segment, output) = &result.segments[0];
276 assert_eq!(*segment, PruneSegment::Bodies);
277 assert_eq!(output.pruned, test_case.expected_pruned);
278
279 if let Some(expected_lowest) = test_case.expected_lowest_block {
280 let static_provider = factory.static_file_provider();
281 assert_eq!(
282 static_provider.get_lowest_range_end(StaticFileSegment::Transactions),
283 Some(expected_lowest)
284 );
285 assert_eq!(
286 static_provider.get_highest_static_file_block(StaticFileSegment::Transactions),
287 highest_before
288 );
289 }
290 }
291
292 #[test]
293 fn bodies_prune_through_pruner() {
294 let factory = create_test_provider_factory();
295 let tip = 2_499_999;
296 setup_static_file_jars(&factory, tip);
297
298 let (_, _finished_exex_height_rx) =
299 tokio::sync::watch::channel(FinishedExExHeight::NoExExs);
300
301 let test_cases = vec![
302 TestCase::new()
305 .with_bodies_mode(PruneMode::Before(750_000))
306 .with_expected_pruned(1000)
307 .with_lowest_block(999_999),
308 TestCase::new().with_bodies_mode(PruneMode::Before(850_000)).with_lowest_block(999_999),
311 TestCase::new()
314 .with_bodies_mode(PruneMode::Before(1_599_999))
315 .with_expected_pruned(2000)
316 .with_lowest_block(1_999_999),
317 TestCase::new()
321 .with_bodies_mode(PruneMode::Distance(500_000))
322 .with_expected_pruned(1000)
323 .with_lowest_block(2_499_999),
324 TestCase::new()
327 .with_bodies_mode(PruneMode::Before(2_300_000))
328 .with_lowest_block(2_499_999),
329 ];
330
331 for test_case in test_cases {
332 run_prune_test(&factory, test_case, tip);
333 }
334 }
335
336 #[test]
337 fn checkpoint_reflects_deleted_files_not_target() {
338 let factory = create_test_provider_factory();
342 let tip = 1_499_999;
343 setup_static_file_jars(&factory, tip);
344
345 let bodies = Bodies::new(PruneMode::Before(900_000), None);
350 let segments: Vec<Box<dyn Segment<_>>> = vec![Box::new(bodies)];
351
352 let (_, finished_exex_height_rx) = tokio::sync::watch::channel(FinishedExExHeight::NoExExs);
353
354 let mut pruner =
355 Pruner::new_with_factory(factory, segments, 5, 10000, None, finished_exex_height_rx);
356
357 let result = pruner.run(tip).expect("pruner run");
358
359 assert_eq!(result.progress, PruneProgress::Finished);
360 assert_eq!(result.segments.len(), 1);
361
362 let (segment, output) = &result.segments[0];
363 assert_eq!(*segment, PruneSegment::Bodies);
364
365 let checkpoint_block = output.checkpoint.as_ref().and_then(|cp| cp.block_number);
367 assert_eq!(
368 checkpoint_block,
369 Some(499_999),
370 "Checkpoint should be 499_999 (end of deleted jar 0), not 899_999 (to_block)"
371 );
372 }
373
374 #[test]
375 fn min_block_updated_on_sync() {
376 struct MinBlockTestCase {
381 initial_range: Option<SegmentRangeInclusive>,
383 updated_range: SegmentRangeInclusive,
384 expected_before_update: Option<BlockNumber>,
386 expected_after_update: BlockNumber,
387 delete_below_block: BlockNumber,
389 expected_deleted: usize,
391 }
392
393 let test_cases = vec![
394 MinBlockTestCase {
396 initial_range: None,
397 updated_range: SegmentRangeInclusive::new(0, 100),
398 expected_before_update: None,
399 expected_after_update: 100,
400 delete_below_block: 1,
401 expected_deleted: 0,
402 },
403 MinBlockTestCase {
405 initial_range: Some(SegmentRangeInclusive::new(0, 0)),
406 updated_range: SegmentRangeInclusive::new(0, 100),
407 expected_before_update: Some(0),
408 expected_after_update: 100,
409 delete_below_block: 1,
410 expected_deleted: 0,
411 },
412 MinBlockTestCase {
414 initial_range: Some(SegmentRangeInclusive::new(0, 50)),
415 updated_range: SegmentRangeInclusive::new(0, 200),
416 expected_before_update: Some(50),
417 expected_after_update: 200,
418 delete_below_block: 150,
419 expected_deleted: 0,
420 },
421 ];
422
423 for (
424 idx,
425 MinBlockTestCase {
426 initial_range,
427 updated_range,
428 expected_before_update,
429 expected_after_update,
430 delete_below_block,
431 expected_deleted,
432 },
433 ) in test_cases.into_iter().enumerate()
434 {
435 let factory = create_test_provider_factory();
436 let static_provider = factory.static_file_provider();
437
438 let mut writer =
439 static_provider.latest_writer(StaticFileSegment::Transactions).unwrap();
440
441 if let Some(initial_range) = initial_range {
443 *writer.user_header_mut() = SegmentHeader::new(
444 static_provider
447 .find_fixed_range(StaticFileSegment::Transactions, initial_range.start()),
448 Some(initial_range),
449 Some(initial_range),
450 StaticFileSegment::Transactions,
451 );
452 writer.inner().set_dirty();
453 writer.commit().unwrap();
454 static_provider.initialize_index().unwrap();
455 }
456
457 assert_eq!(
459 static_provider.get_lowest_range_end(StaticFileSegment::Transactions),
460 expected_before_update,
461 "Test case {}: Initial min_block mismatch",
462 idx
463 );
464
465 writer.user_header_mut().set_block_range(updated_range.start(), updated_range.end());
467 writer.user_header_mut().set_tx_range(updated_range.start(), updated_range.end());
468 writer.inner().set_dirty();
469 writer.commit().unwrap(); assert_eq!(
473 static_provider.get_lowest_range_end(StaticFileSegment::Transactions),
474 Some(expected_after_update),
475 "Test case {}: min_block should be updated to {} (not stuck at stale value)",
476 idx,
477 expected_after_update
478 );
479
480 let deleted = static_provider
482 .delete_segment_below_block(StaticFileSegment::Transactions, delete_below_block)
483 .unwrap();
484
485 assert_eq!(deleted.len(), expected_deleted);
486 }
487 }
488
489 #[test]
490 fn bodies_with_tx_lookup_coordination() {
491 let tip = 1_523_000;
498
499 let test_cases = vec![
500 TestCase::new()
503 .with_bodies_mode(PruneMode::Before(600_000))
504 .with_expected_pruned(1000)
505 .with_lowest_block(999_999),
506 TestCase::new()
508 .with_tx_lookup(PruneMode::Before(600_000), None)
509 .with_bodies_mode(PruneMode::Before(600_000))
510 .with_expected_progress(PruneProgress::HasMoreData(
511 PruneInterruptReason::WaitingOnSegment(PruneSegment::TransactionLookup),
512 ))
513 .with_lowest_block(499_999), TestCase::new()
517 .with_tx_lookup(PruneMode::Before(600_000), Some(599_999))
518 .with_bodies_mode(PruneMode::Before(600_000))
519 .with_expected_pruned(1000)
520 .with_lowest_block(999_999),
521 TestCase::new()
526 .with_tx_lookup(PruneMode::Before(600_000), Some(250_000))
527 .with_bodies_mode(PruneMode::Before(600_000))
528 .with_lowest_block(499_999), TestCase::new()
533 .with_tx_lookup(PruneMode::Distance(500_000), Some(1_023_000))
534 .with_bodies_mode(PruneMode::Distance(500_000))
535 .with_expected_pruned(2000)
536 .with_lowest_block(1_499_999),
537 TestCase::new()
544 .with_tx_lookup(PruneMode::Distance(1_000_000), Some(523_000))
545 .with_bodies_mode(PruneMode::Distance(500_000))
546 .with_expected_pruned(1000) .with_lowest_block(999_999), TestCase::new()
553 .with_tx_lookup(PruneMode::Before(1_100_000), Some(1_099_999))
554 .with_bodies_mode(PruneMode::Before(1_100_000))
555 .with_expected_pruned(2000)
556 .with_lowest_block(1_499_999), TestCase::new()
563 .with_tx_lookup(PruneMode::Before(600_000), Some(599_999))
564 .with_bodies_mode(PruneMode::Before(1_100_000))
565 .with_expected_pruned(2000)
566 .with_lowest_block(1_499_999), TestCase::new()
572 .with_bodies_mode(PruneMode::Before(1_000_000))
573 .with_expected_pruned(2000)
574 .with_expected_progress(PruneProgress::Finished)
575 .with_lowest_block(1_499_999), ];
577
578 for test_case in test_cases {
579 let factory = create_test_provider_factory();
580 setup_static_file_jars(&factory, tip);
581 run_prune_test(&factory, test_case, tip);
582 }
583 }
584}