Skip to main content

reth_prune/segments/user/
bodies.rs

1use crate::{
2    segments::{self, PruneInput, Segment},
3    PrunerError,
4};
5use alloy_primitives::BlockNumber;
6use reth_provider::{BlockReader, PruneCheckpointReader, StaticFileProviderFactory};
7use reth_prune_types::{
8    PruneInterruptReason, PruneMode, PrunePurpose, PruneSegment, SegmentOutput,
9    SegmentOutputCheckpoint,
10};
11use reth_static_file_types::StaticFileSegment;
12use tracing::{debug, instrument};
13
14/// Segment responsible for pruning transactions in static files.
15///
16/// This segment is controlled by the `bodies_history` configuration.
17#[derive(Debug)]
18pub struct Bodies {
19    mode: PruneMode,
20    /// Transaction lookup prune mode. Used to determine if we need to wait for tx lookup pruning
21    /// before deleting transaction bodies.
22    tx_lookup_mode: Option<PruneMode>,
23}
24
25impl Bodies {
26    /// Creates a new [`Bodies`] segment with the given prune mode and optional transaction lookup
27    /// prune mode for coordination.
28    pub const fn new(mode: PruneMode, tx_lookup_mode: Option<PruneMode>) -> Self {
29        Self { mode, tx_lookup_mode }
30    }
31
32    /// Returns the next best block that bodies can prune up to considering the transaction lookup
33    /// pruning configuration (if any) and progress.
34    ///
35    /// Returns `None` if there's no block available to prune (e.g., waiting on `tx_lookup`).
36    fn next_bodies_prune_target<Provider>(
37        &self,
38        provider: &Provider,
39        input: &PruneInput,
40    ) -> Result<Option<BlockNumber>, PrunerError>
41    where
42        Provider: PruneCheckpointReader,
43    {
44        let Some(tx_lookup_mode) = self.tx_lookup_mode else { return Ok(Some(input.to_block)) };
45
46        let tx_lookup_checkpoint = provider
47            .get_prune_checkpoint(PruneSegment::TransactionLookup)?
48            .and_then(|cp| cp.block_number);
49
50        // Determine the safe prune target, if any.
51        // tx_lookup's next_pruned_block tells us what block it will prune next.
52        // - None: tx_lookup will never prune more blocks (e.g. Before(N) reached its target), so
53        //   bodies can prune freely
54        // - Some(next) > to_block: tx_lookup is ahead of our target, so we're safe to prune
55        //   to_block
56        // - Some(next) <= to_block: tx_lookup still needs to prune blocks we want to delete, so we
57        //   must wait and only prune up to (next - 1) to preserve tx data it needs
58        let to_block = match tx_lookup_mode.next_pruned_block(tx_lookup_checkpoint) {
59            None => Some(input.to_block),
60            Some(tx_lookup_next) if tx_lookup_next > input.to_block => Some(input.to_block),
61            Some(tx_lookup_next) => {
62                // We can only prune bodies up to the block BEFORE tx_lookup's next target.
63                // tx_lookup_next is the next block tx_lookup will prune, meaning it still needs
64                // to read transactions from that block. We must preserve those transactions,
65                // so bodies can only safely delete up to (tx_lookup_next - 1).
66                let Some(safe) = tx_lookup_next.checked_sub(1) else {
67                    return Ok(None);
68                };
69
70                if input.previous_checkpoint.is_some_and(|cp| cp.block_number.unwrap_or(0) >= safe)
71                {
72                    // we have pruned what we can
73                    return Ok(None)
74                }
75
76                debug!(
77                    target: "pruner",
78                    to_block = input.to_block,
79                    safe,
80                    "Bodies pruning limited by tx_lookup progress"
81                );
82                Some(safe)
83            }
84        };
85
86        Ok(to_block)
87    }
88}
89
90impl<Provider> Segment<Provider> for Bodies
91where
92    Provider: StaticFileProviderFactory + BlockReader + PruneCheckpointReader,
93{
94    fn segment(&self) -> PruneSegment {
95        PruneSegment::Bodies
96    }
97
98    fn mode(&self) -> Option<PruneMode> {
99        Some(self.mode)
100    }
101
102    fn purpose(&self) -> PrunePurpose {
103        PrunePurpose::User
104    }
105
106    #[instrument(
107        name = "Bodies::prune",
108        target = "pruner",
109        skip(self, provider),
110        ret(level = "trace")
111    )]
112    fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
113        let Some(to_block) = self.next_bodies_prune_target(provider, &input)? else {
114            debug!(
115                to_block = input.to_block,
116                "Transaction lookup still has work to be done up to target block"
117            );
118            return Ok(SegmentOutput::not_done(
119                PruneInterruptReason::WaitingOnSegment(PruneSegment::TransactionLookup),
120                input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
121            ));
122        };
123
124        // Use the coordinated to_block instead of input.to_block
125        let adjusted_input = PruneInput { to_block, ..input };
126        segments::prune_static_files(provider, adjusted_input, StaticFileSegment::Transactions)
127    }
128}
129
130#[cfg(test)]
131mod tests {
132    use super::*;
133    use crate::Pruner;
134    use alloy_primitives::BlockNumber;
135    use reth_exex_types::FinishedExExHeight;
136    use reth_provider::{
137        test_utils::{create_test_provider_factory, MockNodeTypesWithDB},
138        DBProvider, DatabaseProviderFactory, ProviderFactory, PruneCheckpointWriter,
139        StaticFileWriter,
140    };
141    use reth_prune_types::{PruneMode, PruneProgress, PruneSegment};
142    use reth_static_file_types::{
143        SegmentHeader, SegmentRangeInclusive, StaticFileSegment, DEFAULT_BLOCKS_PER_STATIC_FILE,
144    };
145
146    /// Creates empty static file jars at 500k block intervals up to the tip block.
147    ///
148    /// Each jar contains sequential transaction ranges for testing deletion logic.
149    fn setup_static_file_jars<P: StaticFileProviderFactory>(provider: &P, tip_block: u64) {
150        let num_jars = (tip_block + 1) / DEFAULT_BLOCKS_PER_STATIC_FILE;
151        let txs_per_jar = 1000;
152        let static_file_provider = provider.static_file_provider();
153
154        let mut writer =
155            static_file_provider.latest_writer(StaticFileSegment::Transactions).unwrap();
156
157        for jar_idx in 0..num_jars {
158            let block_start = jar_idx * DEFAULT_BLOCKS_PER_STATIC_FILE;
159            let block_end = ((jar_idx + 1) * DEFAULT_BLOCKS_PER_STATIC_FILE - 1).min(tip_block);
160
161            let tx_start = jar_idx * txs_per_jar;
162            let tx_end = tx_start + txs_per_jar - 1;
163
164            *writer.user_header_mut() = SegmentHeader::new(
165                SegmentRangeInclusive::new(block_start, block_end),
166                Some(SegmentRangeInclusive::new(block_start, block_end)),
167                Some(SegmentRangeInclusive::new(tx_start, tx_end)),
168                StaticFileSegment::Transactions,
169            );
170
171            writer.inner().set_dirty();
172            writer.commit().expect("commit empty jar");
173
174            if jar_idx < num_jars - 1 {
175                writer.increment_block(block_end + 1).expect("increment block");
176            }
177        }
178
179        static_file_provider.initialize_index().expect("initialize index");
180    }
181
182    struct TestCase {
183        tx_lookup_mode: Option<PruneMode>,
184        tx_lookup_checkpoint_block: Option<BlockNumber>,
185        bodies_mode: PruneMode,
186        expected_pruned: usize,
187        expected_lowest_block: Option<BlockNumber>,
188        expected_progress: PruneProgress,
189    }
190
191    impl TestCase {
192        fn new() -> Self {
193            Self {
194                tx_lookup_mode: None,
195                tx_lookup_checkpoint_block: None,
196                bodies_mode: PruneMode::Full,
197                expected_pruned: 0,
198                expected_lowest_block: None,
199                expected_progress: PruneProgress::Finished,
200            }
201        }
202
203        fn with_bodies_mode(mut self, mode: PruneMode) -> Self {
204            self.bodies_mode = mode;
205            self
206        }
207
208        fn with_expected_pruned(mut self, pruned: usize) -> Self {
209            self.expected_pruned = pruned;
210            self
211        }
212
213        fn with_expected_progress(mut self, progress: PruneProgress) -> Self {
214            self.expected_progress = progress;
215            self
216        }
217
218        fn with_lowest_block(mut self, block: BlockNumber) -> Self {
219            self.expected_lowest_block = Some(block);
220            self
221        }
222
223        fn with_tx_lookup(mut self, mode: PruneMode, checkpoint: Option<BlockNumber>) -> Self {
224            self.tx_lookup_mode = Some(mode);
225            self.tx_lookup_checkpoint_block = checkpoint;
226            self
227        }
228    }
229
230    fn run_prune_test(
231        factory: &ProviderFactory<MockNodeTypesWithDB>,
232        test_case: TestCase,
233        tip: BlockNumber,
234    ) {
235        let (_, finished_exex_height_rx) = tokio::sync::watch::channel(FinishedExExHeight::NoExExs);
236
237        // Capture highest block before pruning
238        let static_provider = factory.static_file_provider();
239        let highest_before =
240            static_provider.get_highest_static_file_block(StaticFileSegment::Transactions);
241
242        // Set up tx_lookup checkpoint if provided
243        if let Some(checkpoint_block) = test_case.tx_lookup_checkpoint_block {
244            let provider = factory.database_provider_rw().unwrap();
245            provider
246                .save_prune_checkpoint(
247                    PruneSegment::TransactionLookup,
248                    reth_prune_types::PruneCheckpoint {
249                        block_number: Some(checkpoint_block),
250                        tx_number: None,
251                        prune_mode: test_case.tx_lookup_mode.unwrap(),
252                    },
253                )
254                .unwrap();
255            provider.commit().unwrap();
256        }
257
258        let bodies = Bodies::new(test_case.bodies_mode, test_case.tx_lookup_mode);
259        let segments: Vec<Box<dyn Segment<_>>> = vec![Box::new(bodies)];
260
261        let mut pruner = Pruner::new_with_factory(
262            factory.clone(),
263            segments,
264            5,
265            10000,
266            None,
267            finished_exex_height_rx,
268        );
269
270        let result = pruner.run(tip).expect("pruner run");
271
272        assert_eq!(result.progress, test_case.expected_progress);
273        assert_eq!(result.segments.len(), 1);
274
275        let (segment, output) = &result.segments[0];
276        assert_eq!(*segment, PruneSegment::Bodies);
277        assert_eq!(output.pruned, test_case.expected_pruned);
278
279        if let Some(expected_lowest) = test_case.expected_lowest_block {
280            let static_provider = factory.static_file_provider();
281            assert_eq!(
282                static_provider.get_lowest_range_end(StaticFileSegment::Transactions),
283                Some(expected_lowest)
284            );
285            assert_eq!(
286                static_provider.get_highest_static_file_block(StaticFileSegment::Transactions),
287                highest_before
288            );
289        }
290    }
291
292    #[test]
293    fn bodies_prune_through_pruner() {
294        let factory = create_test_provider_factory();
295        let tip = 2_499_999;
296        setup_static_file_jars(&factory, tip);
297
298        let (_, _finished_exex_height_rx) =
299            tokio::sync::watch::channel(FinishedExExHeight::NoExExs);
300
301        let test_cases = vec![
302            // Test 1: PruneMode::Before(750_000) → deletes jar 0 (0-499_999)
303            // Checkpoint 499_999 != target 749_999 -> HasMoreData
304            TestCase::new()
305                .with_bodies_mode(PruneMode::Before(750_000))
306                .with_expected_pruned(1000)
307                .with_lowest_block(999_999),
308            // Test 2: PruneMode::Before(850_000) → no deletion (jar 1: 500_000-999_999 contains
309            // target)
310            TestCase::new().with_bodies_mode(PruneMode::Before(850_000)).with_lowest_block(999_999),
311            // Test 3: PruneMode::Before(1_599_999) → deletes jars 0 and 1 (0-999_999)
312            // Checkpoint 999_999 != target 1_599_998 -> HasMoreData
313            TestCase::new()
314                .with_bodies_mode(PruneMode::Before(1_599_999))
315                .with_expected_pruned(2000)
316                .with_lowest_block(1_999_999),
317            // Test 4: PruneMode::Distance(500_000) with tip=2_499_999 → deletes jar 3
318            // (1_500_000-1_999_999) Checkpoint 1_999_999 == target 1_999_999 ->
319            // Finished
320            TestCase::new()
321                .with_bodies_mode(PruneMode::Distance(500_000))
322                .with_expected_pruned(1000)
323                .with_lowest_block(2_499_999),
324            // Test 5: PruneMode::Before(2_300_000) → no deletion (jar 4: 2_000_000-2_499_999
325            // contains target)
326            TestCase::new()
327                .with_bodies_mode(PruneMode::Before(2_300_000))
328                .with_lowest_block(2_499_999),
329        ];
330
331        for test_case in test_cases {
332            run_prune_test(&factory, test_case, tip);
333        }
334    }
335
336    #[test]
337    fn checkpoint_reflects_deleted_files_not_target() {
338        // Test that checkpoint is set to the highest deleted block, not to_block.
339        // When to_block falls in the middle of an undeleted file, checkpoint should reflect
340        // what was actually deleted.
341        let factory = create_test_provider_factory();
342        let tip = 1_499_999;
343        setup_static_file_jars(&factory, tip);
344
345        // Use PruneMode::Before(900_000) which targets 899_999.
346        // This should delete jar 0 (0-499_999) since it's entirely below the target.
347        // Jar 1 (500_000-999_999) contains the target, so it won't be deleted.
348        // Checkpoint should be 499_999 (end of jar 0), not 899_999 (to_block).
349        let bodies = Bodies::new(PruneMode::Before(900_000), None);
350        let segments: Vec<Box<dyn Segment<_>>> = vec![Box::new(bodies)];
351
352        let (_, finished_exex_height_rx) = tokio::sync::watch::channel(FinishedExExHeight::NoExExs);
353
354        let mut pruner =
355            Pruner::new_with_factory(factory, segments, 5, 10000, None, finished_exex_height_rx);
356
357        let result = pruner.run(tip).expect("pruner run");
358
359        assert_eq!(result.progress, PruneProgress::Finished);
360        assert_eq!(result.segments.len(), 1);
361
362        let (segment, output) = &result.segments[0];
363        assert_eq!(*segment, PruneSegment::Bodies);
364
365        // Verify checkpoint is set to the end of deleted jar (499_999), not to_block (899_999)
366        let checkpoint_block = output.checkpoint.as_ref().and_then(|cp| cp.block_number);
367        assert_eq!(
368            checkpoint_block,
369            Some(499_999),
370            "Checkpoint should be 499_999 (end of deleted jar 0), not 899_999 (to_block)"
371        );
372    }
373
374    #[test]
375    fn min_block_updated_on_sync() {
376        // Regression test: update_index must update min_block to prevent stale values
377        // that can cause pruner to incorrectly delete static files when PruneMode::Before(0) is
378        // used.
379
380        struct MinBlockTestCase {
381            // Block range
382            initial_range: Option<SegmentRangeInclusive>,
383            updated_range: SegmentRangeInclusive,
384            // Min block
385            expected_before_update: Option<BlockNumber>,
386            expected_after_update: BlockNumber,
387            // Test delete_segment_below_block with this value
388            delete_below_block: BlockNumber,
389            // Expected number of deleted segments
390            expected_deleted: usize,
391        }
392
393        let test_cases = vec![
394            // Test 1: Empty initial state (None) -> syncs to block 100
395            MinBlockTestCase {
396                initial_range: None,
397                updated_range: SegmentRangeInclusive::new(0, 100),
398                expected_before_update: None,
399                expected_after_update: 100,
400                delete_below_block: 1,
401                expected_deleted: 0,
402            },
403            // Test 2: Genesis state [0..=0] -> syncs to block 100 (eg. node after init-state)
404            MinBlockTestCase {
405                initial_range: Some(SegmentRangeInclusive::new(0, 0)),
406                updated_range: SegmentRangeInclusive::new(0, 100),
407                expected_before_update: Some(0),
408                expected_after_update: 100,
409                delete_below_block: 1,
410                expected_deleted: 0,
411            },
412            // Test 3: Existing state [0..=50] -> syncs to block 200
413            MinBlockTestCase {
414                initial_range: Some(SegmentRangeInclusive::new(0, 50)),
415                updated_range: SegmentRangeInclusive::new(0, 200),
416                expected_before_update: Some(50),
417                expected_after_update: 200,
418                delete_below_block: 150,
419                expected_deleted: 0,
420            },
421        ];
422
423        for (
424            idx,
425            MinBlockTestCase {
426                initial_range,
427                updated_range,
428                expected_before_update,
429                expected_after_update,
430                delete_below_block,
431                expected_deleted,
432            },
433        ) in test_cases.into_iter().enumerate()
434        {
435            let factory = create_test_provider_factory();
436            let static_provider = factory.static_file_provider();
437
438            let mut writer =
439                static_provider.latest_writer(StaticFileSegment::Transactions).unwrap();
440
441            // Set up initial state if provided
442            if let Some(initial_range) = initial_range {
443                *writer.user_header_mut() = SegmentHeader::new(
444                    // Expected block range needs to have a fixed size that's determined by the
445                    // provider itself
446                    static_provider
447                        .find_fixed_range(StaticFileSegment::Transactions, initial_range.start()),
448                    Some(initial_range),
449                    Some(initial_range),
450                    StaticFileSegment::Transactions,
451                );
452                writer.inner().set_dirty();
453                writer.commit().unwrap();
454                static_provider.initialize_index().unwrap();
455            }
456
457            // Verify initial state
458            assert_eq!(
459                static_provider.get_lowest_range_end(StaticFileSegment::Transactions),
460                expected_before_update,
461                "Test case {}: Initial min_block mismatch",
462                idx
463            );
464
465            // Update to new block and tx ranges
466            writer.user_header_mut().set_block_range(updated_range.start(), updated_range.end());
467            writer.user_header_mut().set_tx_range(updated_range.start(), updated_range.end());
468            writer.inner().set_dirty();
469            writer.commit().unwrap(); // update_index is called inside
470
471            // Verify min_block was updated (not stuck at stale value)
472            assert_eq!(
473                static_provider.get_lowest_range_end(StaticFileSegment::Transactions),
474                Some(expected_after_update),
475                "Test case {}: min_block should be updated to {} (not stuck at stale value)",
476                idx,
477                expected_after_update
478            );
479
480            // Verify delete_segment_below_block behaves correctly with updated min_block
481            let deleted = static_provider
482                .delete_segment_below_block(StaticFileSegment::Transactions, delete_below_block)
483                .unwrap();
484
485            assert_eq!(deleted.len(), expected_deleted);
486        }
487    }
488
489    #[test]
490    fn bodies_with_tx_lookup_coordination() {
491        // Test that bodies pruning correctly coordinates with tx lookup pruning
492        // Using tip = 1_523_000 creates 4 static file jars:
493        // - Jar 0: blocks 0-499_999, txs 0-999
494        // - Jar 1: blocks 500_000-999_999, txs 1000-1999
495        // - Jar 2: blocks 1_000_000-1_499_999, txs 2000-2999
496        // - Jar 3: blocks 1_500_000-1_523_000, txs 3000-3999
497        let tip = 1_523_000;
498
499        let test_cases = vec![
500            // Scenario 1: tx_lookup disabled, bodies can prune freely (deletes jar 0)
501            // Checkpoint is 499_999 (end of jar 0), target is 599_999, so HasMoreData
502            TestCase::new()
503                .with_bodies_mode(PruneMode::Before(600_000))
504                .with_expected_pruned(1000)
505                .with_lowest_block(999_999),
506            // Scenario 2: tx_lookup enabled but not run yet, bodies cannot prune
507            TestCase::new()
508                .with_tx_lookup(PruneMode::Before(600_000), None)
509                .with_bodies_mode(PruneMode::Before(600_000))
510                .with_expected_progress(PruneProgress::HasMoreData(
511                    PruneInterruptReason::WaitingOnSegment(PruneSegment::TransactionLookup),
512                ))
513                .with_lowest_block(499_999), // No jars deleted, jar 0 ends at 499_999
514            // Scenario 3: tx_lookup caught up to its target, bodies can prune freely
515            // Deletes jar 0, checkpoint is 499_999, target is 599_999 -> HasMoreData
516            TestCase::new()
517                .with_tx_lookup(PruneMode::Before(600_000), Some(599_999))
518                .with_bodies_mode(PruneMode::Before(600_000))
519                .with_expected_pruned(1000)
520                .with_lowest_block(999_999),
521            // Scenario 4: tx_lookup behind its target, bodies limited to tx_lookup checkpoint
522            // tx_lookup should prune up to 599_999, but checkpoint is only at 250_000
523            // bodies wants to prune up to 599_999, but limited to 250_000
524            // No jars deleted because jar 0 (0-499_999) ends beyond 250_000
525            TestCase::new()
526                .with_tx_lookup(PruneMode::Before(600_000), Some(250_000))
527                .with_bodies_mode(PruneMode::Before(600_000))
528                .with_lowest_block(499_999), // No jars deleted
529            // Scenario 5: Both use Distance, tx_lookup caught up
530            // With tip=1_523_000, Distance(500_000) targets block 1_023_000
531            // Deletes jars 0 and 1, checkpoint is 999_999, target is 1_023_000 -> HasMoreData
532            TestCase::new()
533                .with_tx_lookup(PruneMode::Distance(500_000), Some(1_023_000))
534                .with_bodies_mode(PruneMode::Distance(500_000))
535                .with_expected_pruned(2000)
536                .with_lowest_block(1_499_999),
537            // Scenario 6: Both use Distance, tx_lookup less aggressive (bigger distance) than
538            // bodies With tip=1_523_000:
539            // - tx_lookup: Distance(1_000_000) targets block 523_000, checkpoint at 523_000
540            // - bodies: Distance(500_000) targets block 1_023_000
541            // Bodies can prune up to what tx_lookup has finished (523_000), deleting jar 0
542            // Checkpoint is 499_999, target is 1_023_000 -> HasMoreData
543            TestCase::new()
544                .with_tx_lookup(PruneMode::Distance(1_000_000), Some(523_000))
545                .with_bodies_mode(PruneMode::Distance(500_000))
546                .with_expected_pruned(1000) // Jar 0 deleted
547                .with_lowest_block(999_999), // Jar 0 (0-499_999) deleted
548            // Scenario 7: tx_lookup more aggressive than bodies (deletes jar 0 and 1)
549            // tx_lookup: Before(1_100_000) -> prune up to 1_099_999
550            // bodies: Before(1_100_000) -> wants to prune up to 1_099_999
551            // Checkpoint is 999_999, target is 1_099_999 -> HasMoreData
552            TestCase::new()
553                .with_tx_lookup(PruneMode::Before(1_100_000), Some(1_099_999))
554                .with_bodies_mode(PruneMode::Before(1_100_000))
555                .with_expected_pruned(2000)
556                .with_lowest_block(1_499_999), // Jars 0 and 1 deleted
557            // Scenario 8: tx_lookup has lower target than bodies, but is done
558            // tx_lookup: Before(600_000) -> prune up to 599_999 (checkpoint at 599_999, DONE)
559            // bodies: Before(1_100_000) -> wants to prune up to 1_099_999
560            // Since tx_lookup is done (next_pruned_block returns None), bodies can prune freely
561            // Checkpoint is 999_999, target is 1_099_999 -> HasMoreData
562            TestCase::new()
563                .with_tx_lookup(PruneMode::Before(600_000), Some(599_999))
564                .with_bodies_mode(PruneMode::Before(1_100_000))
565                .with_expected_pruned(2000)
566                .with_lowest_block(1_499_999), // Jars 0 and 1 deleted
567            // Scenario 9: Perfect alignment - checkpoint equals target
568            // bodies: Before(1_000_000) -> targets 999_999
569            // Deletes jars 0 and 1 (0-999_999), checkpoint is 999_999 which equals target ->
570            // Finished
571            TestCase::new()
572                .with_bodies_mode(PruneMode::Before(1_000_000))
573                .with_expected_pruned(2000)
574                .with_expected_progress(PruneProgress::Finished)
575                .with_lowest_block(1_499_999), // Jars 0 and 1 deleted
576        ];
577
578        for test_case in test_cases {
579            let factory = create_test_provider_factory();
580            setup_static_file_jars(&factory, tip);
581            run_prune_test(&factory, test_case, tip);
582        }
583    }
584}