Skip to main content

reth_prune/segments/
mod.rs

1mod receipts;
2mod set;
3mod user;
4
5use crate::{PruneLimiter, PrunerError};
6use alloy_primitives::{BlockNumber, TxNumber};
7use reth_provider::{
8    errors::provider::ProviderResult, BlockReader, PruneCheckpointWriter, StaticFileProviderFactory,
9};
10use reth_prune_types::{
11    PruneCheckpoint, PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutput,
12    SegmentOutputCheckpoint,
13};
14use reth_stages_types::StageId;
15use reth_static_file_types::StaticFileSegment;
16pub use set::SegmentSet;
17use std::{fmt::Debug, ops::RangeInclusive};
18use tracing::error;
19pub use user::{
20    AccountHistory, Bodies, Receipts as UserReceipts, ReceiptsByLogs, SenderRecovery,
21    StorageHistory, TransactionLookup,
22};
23
24/// Prunes data from static files for a given segment.
25///
26/// This is a generic helper function used by both receipts and bodies pruning
27/// when data is stored in static files.
28///
29/// The checkpoint block number is set to the highest block in the actually deleted files,
30/// not `input.to_block`, since `to_block` might refer to a block in the middle of an
31/// undeleted file.
32pub(crate) fn prune_static_files<Provider>(
33    provider: &Provider,
34    input: PruneInput,
35    segment: StaticFileSegment,
36) -> Result<SegmentOutput, PrunerError>
37where
38    Provider: StaticFileProviderFactory,
39{
40    let deleted_headers =
41        provider.static_file_provider().delete_segment_below_block(segment, input.to_block + 1)?;
42
43    if deleted_headers.is_empty() {
44        return Ok(SegmentOutput {
45            progress: PruneProgress::Finished,
46            pruned: 0,
47            checkpoint: input
48                .previous_checkpoint
49                .map(SegmentOutputCheckpoint::from_prune_checkpoint),
50        })
51    }
52
53    let tx_ranges = deleted_headers.iter().filter_map(|header| header.tx_range());
54
55    let pruned = tx_ranges.clone().map(|range| range.len()).sum::<u64>() as usize;
56
57    // The highest block number in the deleted files is the actual checkpoint.
58    let checkpoint_block = deleted_headers
59        .iter()
60        .filter_map(|header| header.block_range())
61        .map(|range| range.end())
62        .max();
63
64    Ok(SegmentOutput {
65        progress: PruneProgress::Finished,
66        pruned,
67        checkpoint: Some(SegmentOutputCheckpoint {
68            block_number: checkpoint_block,
69            tx_number: tx_ranges.map(|range| range.end()).max(),
70        }),
71    })
72}
73
74/// Deletes ALL static file jars for a given segment.
75///
76/// This is used for `PruneMode::Full` where all data should be removed, including the highest jar.
77/// Unlike [`prune_static_files`], this does not preserve the most recent jar.
78pub(crate) fn delete_static_files_segment<Provider>(
79    provider: &Provider,
80    input: PruneInput,
81    segment: StaticFileSegment,
82) -> Result<SegmentOutput, PrunerError>
83where
84    Provider: StaticFileProviderFactory,
85{
86    let deleted_headers = provider.static_file_provider().delete_segment(segment)?;
87
88    if deleted_headers.is_empty() {
89        return Ok(SegmentOutput::done())
90    }
91
92    let tx_ranges = deleted_headers.iter().filter_map(|header| header.tx_range());
93
94    let pruned = tx_ranges.clone().map(|range| range.len()).sum::<u64>() as usize;
95
96    Ok(SegmentOutput {
97        progress: PruneProgress::Finished,
98        pruned,
99        checkpoint: Some(SegmentOutputCheckpoint {
100            block_number: Some(input.to_block),
101            tx_number: tx_ranges.map(|range| range.end()).max(),
102        }),
103    })
104}
105
106/// A segment represents a pruning of some portion of the data.
107///
108/// Segments are called from [`Pruner`](crate::Pruner) with the following lifecycle:
109/// 1. Call [`Segment::prune`] with `delete_limit` of [`PruneInput`].
110/// 2. If [`Segment::prune`] returned a [`Some`] in `checkpoint` of [`SegmentOutput`], call
111///    [`Segment::save_checkpoint`].
112/// 3. Subtract `pruned` of [`SegmentOutput`] from `delete_limit` of next [`PruneInput`].
113pub trait Segment<Provider>: Debug + Send + Sync {
114    /// Segment of data that's pruned.
115    fn segment(&self) -> PruneSegment;
116
117    /// Prune mode with which the segment was initialized.
118    fn mode(&self) -> Option<PruneMode>;
119
120    /// Purpose of the segment.
121    fn purpose(&self) -> PrunePurpose;
122
123    /// Prune data for [`Self::segment`] using the provided input.
124    fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError>;
125
126    /// Save checkpoint for [`Self::segment`] to the database.
127    fn save_checkpoint(
128        &self,
129        provider: &Provider,
130        checkpoint: PruneCheckpoint,
131    ) -> ProviderResult<()>
132    where
133        Provider: PruneCheckpointWriter,
134    {
135        provider.save_prune_checkpoint(self.segment(), checkpoint)
136    }
137
138    /// Returns the stage this segment depends on, if any.
139    ///
140    /// If this returns `Some(stage_id)`, the pruner will skip this segment if the stage
141    /// has not yet caught up with the `Finish` stage checkpoint.
142    fn required_stage(&self) -> Option<StageId> {
143        None
144    }
145}
146
147/// Segment pruning input, see [`Segment::prune`].
148#[derive(Debug)]
149#[cfg_attr(test, derive(Clone))]
150pub struct PruneInput {
151    pub(crate) previous_checkpoint: Option<PruneCheckpoint>,
152    /// Target block up to which the pruning needs to be done, inclusive.
153    pub(crate) to_block: BlockNumber,
154    /// Limits pruning of a segment.
155    pub(crate) limiter: PruneLimiter,
156}
157
158impl PruneInput {
159    /// Get next inclusive tx number range to prune according to the checkpoint and `to_block` block
160    /// number.
161    ///
162    /// To get the range start:
163    /// 1. If checkpoint exists, get next block body and return its first tx number.
164    /// 2. If checkpoint doesn't exist, return 0.
165    ///
166    /// To get the range end: get last tx number for `to_block`.
167    pub(crate) fn get_next_tx_num_range<Provider: BlockReader>(
168        &self,
169        provider: &Provider,
170    ) -> ProviderResult<Option<RangeInclusive<TxNumber>>> {
171        let from_tx_number = self.previous_checkpoint
172            // Checkpoint exists, prune from the next transaction after the highest pruned one
173            .and_then(|checkpoint| match checkpoint.tx_number {
174                Some(tx_number) => Some(tx_number + 1),
175                _ => {
176                    error!(target: "pruner", ?checkpoint, "Expected transaction number in prune checkpoint, found None");
177                    None
178                },
179            })
180            // No checkpoint exists, prune from genesis
181            .unwrap_or_default();
182
183        let to_tx_number = match provider.block_body_indices(self.to_block)? {
184            Some(body) => {
185                let last_tx = body.last_tx_num();
186                if last_tx + body.tx_count() == 0 {
187                    // Prevents a scenario where the pruner correctly starts at a finalized block,
188                    // but the first transaction (tx_num = 0) only appears on a non-finalized one.
189                    // Should only happen on a test/hive scenario.
190                    return Ok(None)
191                }
192                last_tx
193            }
194            None => return Ok(None),
195        };
196
197        let range = from_tx_number..=to_tx_number;
198        if range.is_empty() {
199            return Ok(None)
200        }
201
202        Ok(Some(range))
203    }
204
205    /// Get next inclusive block range to prune according to the checkpoint, `to_block` block
206    /// number and `limit`.
207    ///
208    /// To get the range start (`from_block`):
209    /// 1. If checkpoint exists, use next block.
210    /// 2. If checkpoint doesn't exist, use block 0.
211    ///
212    /// To get the range end: use block `to_block`.
213    pub(crate) fn get_next_block_range(&self) -> Option<RangeInclusive<BlockNumber>> {
214        let from_block = self.get_start_next_block_range();
215        let range = from_block..=self.to_block;
216        if range.is_empty() {
217            return None
218        }
219
220        Some(range)
221    }
222
223    /// Returns the start of the next block range.
224    ///
225    /// 1. If checkpoint exists, use next block.
226    /// 2. If checkpoint doesn't exist, use block 0.
227    pub(crate) fn get_start_next_block_range(&self) -> u64 {
228        self.previous_checkpoint
229            .and_then(|checkpoint| checkpoint.block_number)
230            // Checkpoint exists, prune from the next block after the highest pruned one
231            .map(|block_number| block_number + 1)
232            // No checkpoint exists, prune from genesis
233            .unwrap_or(0)
234    }
235}
236
237#[cfg(test)]
238mod tests {
239    use super::*;
240    use alloy_primitives::B256;
241    use reth_provider::{
242        providers::BlockchainProvider,
243        test_utils::{create_test_provider_factory, MockEthProvider},
244        BlockWriter,
245    };
246    use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
247
248    #[test]
249    fn test_prune_input_get_next_tx_num_range_no_to_block() {
250        let input = PruneInput {
251            previous_checkpoint: None,
252            to_block: 10,
253            limiter: PruneLimiter::default(),
254        };
255
256        // Default provider with no block corresponding to block 10
257        let provider = MockEthProvider::default();
258
259        // No block body for block 10, expected None
260        let range = input.get_next_tx_num_range(&provider).expect("Expected range");
261        assert!(range.is_none());
262    }
263
264    #[test]
265    fn test_prune_input_get_next_tx_num_range_no_tx() {
266        let input = PruneInput {
267            previous_checkpoint: None,
268            to_block: 10,
269            limiter: PruneLimiter::default(),
270        };
271
272        let mut rng = generators::rng();
273        let factory = create_test_provider_factory();
274
275        // Generate 10 random blocks with no transactions
276        let blocks = random_block_range(
277            &mut rng,
278            0..=10,
279            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
280        );
281
282        // Insert the blocks into the database
283        let provider_rw = factory.provider_rw().expect("failed to get provider_rw");
284        for block in &blocks {
285            provider_rw
286                .insert_block(
287                    &block.clone().try_recover().expect("failed to seal block with senders"),
288                )
289                .expect("failed to insert block");
290        }
291        provider_rw.commit().expect("failed to commit");
292
293        // Create a new provider
294        let provider = BlockchainProvider::new(factory).unwrap();
295
296        // Since there are no transactions, expected None
297        let range = input.get_next_tx_num_range(&provider).expect("Expected range");
298        assert!(range.is_none());
299    }
300
301    #[test]
302    fn test_prune_input_get_next_tx_num_range_valid() {
303        // Create a new prune input
304        let input = PruneInput {
305            previous_checkpoint: None,
306            to_block: 10,
307            limiter: PruneLimiter::default(),
308        };
309
310        let mut rng = generators::rng();
311        let factory = create_test_provider_factory();
312
313        // Generate 10 random blocks with some transactions
314        let blocks = random_block_range(
315            &mut rng,
316            0..=10,
317            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..5, ..Default::default() },
318        );
319
320        // Insert the blocks into the database
321        let provider_rw = factory.provider_rw().expect("failed to get provider_rw");
322        for block in &blocks {
323            provider_rw
324                .insert_block(
325                    &block.clone().try_recover().expect("failed to seal block with senders"),
326                )
327                .expect("failed to insert block");
328        }
329        provider_rw.commit().expect("failed to commit");
330
331        // Create a new provider
332        let provider = BlockchainProvider::new(factory).unwrap();
333
334        // Get the next tx number range
335        let range = input.get_next_tx_num_range(&provider).expect("Expected range").unwrap();
336
337        // Calculate the total number of transactions
338        let num_txs = blocks.iter().map(|block| block.transaction_count() as u64).sum::<u64>();
339
340        assert_eq!(range, 0..=num_txs - 1);
341    }
342
343    #[test]
344    fn test_prune_input_get_next_tx_checkpoint_without_tx_number() {
345        // Create a prune input with a previous checkpoint without a tx number (unexpected)
346        let input = PruneInput {
347            previous_checkpoint: Some(PruneCheckpoint {
348                block_number: Some(5),
349                tx_number: None,
350                prune_mode: PruneMode::Full,
351            }),
352            to_block: 10,
353            limiter: PruneLimiter::default(),
354        };
355
356        let mut rng = generators::rng();
357        let factory = create_test_provider_factory();
358
359        // Generate 10 random blocks
360        let blocks = random_block_range(
361            &mut rng,
362            0..=10,
363            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..5, ..Default::default() },
364        );
365
366        // Insert the blocks into the database
367        let provider_rw = factory.provider_rw().expect("failed to get provider_rw");
368        for block in &blocks {
369            provider_rw
370                .insert_block(
371                    &block.clone().try_recover().expect("failed to seal block with senders"),
372                )
373                .expect("failed to insert block");
374        }
375        provider_rw.commit().expect("failed to commit");
376
377        // Create a new provider
378        let provider = BlockchainProvider::new(factory).unwrap();
379
380        // Fetch the range and check if it is correct
381        let range = input.get_next_tx_num_range(&provider).expect("Expected range").unwrap();
382
383        // Calculate the total number of transactions
384        let num_txs = blocks.iter().map(|block| block.transaction_count() as u64).sum::<u64>();
385
386        assert_eq!(range, 0..=num_txs - 1,);
387    }
388
389    #[test]
390    fn test_prune_input_get_next_tx_empty_range() {
391        // Create a new provider via factory
392        let mut rng = generators::rng();
393        let factory = create_test_provider_factory();
394
395        // Generate 10 random blocks
396        let blocks = random_block_range(
397            &mut rng,
398            0..=10,
399            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..5, ..Default::default() },
400        );
401
402        // Insert the blocks into the database
403        let provider_rw = factory.provider_rw().expect("failed to get provider_rw");
404        for block in &blocks {
405            provider_rw
406                .insert_block(
407                    &block.clone().try_recover().expect("failed to seal block with senders"),
408                )
409                .expect("failed to insert block");
410        }
411        provider_rw.commit().expect("failed to commit");
412
413        // Create a new provider
414        let provider = BlockchainProvider::new(factory).unwrap();
415
416        // Get the last tx number
417        // Calculate the total number of transactions
418        let num_txs = blocks.iter().map(|block| block.transaction_count() as u64).sum::<u64>();
419        let max_range = num_txs - 1;
420
421        // Create a prune input with a previous checkpoint that is the last tx number
422        let input = PruneInput {
423            previous_checkpoint: Some(PruneCheckpoint {
424                block_number: Some(5),
425                tx_number: Some(max_range),
426                prune_mode: PruneMode::Full,
427            }),
428            to_block: 10,
429            limiter: PruneLimiter::default(),
430        };
431
432        // We expect an empty range since the previous checkpoint is the last tx number
433        let range = input.get_next_tx_num_range(&provider).expect("Expected range");
434        assert!(range.is_none());
435    }
436}