reth_prune/segments/
mod.rs

1mod receipts;
2mod set;
3mod user;
4
5use crate::{PruneLimiter, PrunerError};
6use alloy_primitives::{BlockNumber, TxNumber};
7use reth_provider::{
8    errors::provider::ProviderResult, BlockReader, PruneCheckpointWriter, StaticFileProviderFactory,
9};
10use reth_prune_types::{
11    PruneCheckpoint, PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutput,
12    SegmentOutputCheckpoint,
13};
14use reth_static_file_types::StaticFileSegment;
15pub use set::SegmentSet;
16use std::{fmt::Debug, ops::RangeInclusive};
17use tracing::error;
18pub use user::{
19    AccountHistory, Bodies, MerkleChangeSets, Receipts as UserReceipts, ReceiptsByLogs,
20    SenderRecovery, StorageHistory, TransactionLookup,
21};
22
23/// Prunes data from static files for a given segment.
24///
25/// This is a generic helper function used by both receipts and bodies pruning
26/// when data is stored in static files.
27pub(crate) fn prune_static_files<Provider>(
28    provider: &Provider,
29    input: PruneInput,
30    segment: StaticFileSegment,
31) -> Result<SegmentOutput, PrunerError>
32where
33    Provider: StaticFileProviderFactory,
34{
35    let deleted_headers =
36        provider.static_file_provider().delete_segment_below_block(segment, input.to_block + 1)?;
37
38    if deleted_headers.is_empty() {
39        return Ok(SegmentOutput::done())
40    }
41
42    let tx_ranges = deleted_headers.iter().filter_map(|header| header.tx_range());
43
44    let pruned = tx_ranges.clone().map(|range| range.len()).sum::<u64>() as usize;
45
46    Ok(SegmentOutput {
47        progress: PruneProgress::Finished,
48        pruned,
49        checkpoint: Some(SegmentOutputCheckpoint {
50            block_number: Some(input.to_block),
51            tx_number: tx_ranges.map(|range| range.end()).max(),
52        }),
53    })
54}
55
56/// A segment represents a pruning of some portion of the data.
57///
58/// Segments are called from [`Pruner`](crate::Pruner) with the following lifecycle:
59/// 1. Call [`Segment::prune`] with `delete_limit` of [`PruneInput`].
60/// 2. If [`Segment::prune`] returned a [`Some`] in `checkpoint` of [`SegmentOutput`], call
61///    [`Segment::save_checkpoint`].
62/// 3. Subtract `pruned` of [`SegmentOutput`] from `delete_limit` of next [`PruneInput`].
63pub trait Segment<Provider>: Debug + Send + Sync {
64    /// Segment of data that's pruned.
65    fn segment(&self) -> PruneSegment;
66
67    /// Prune mode with which the segment was initialized.
68    fn mode(&self) -> Option<PruneMode>;
69
70    /// Purpose of the segment.
71    fn purpose(&self) -> PrunePurpose;
72
73    /// Prune data for [`Self::segment`] using the provided input.
74    fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError>;
75
76    /// Save checkpoint for [`Self::segment`] to the database.
77    fn save_checkpoint(
78        &self,
79        provider: &Provider,
80        checkpoint: PruneCheckpoint,
81    ) -> ProviderResult<()>
82    where
83        Provider: PruneCheckpointWriter,
84    {
85        provider.save_prune_checkpoint(self.segment(), checkpoint)
86    }
87}
88
89/// Segment pruning input, see [`Segment::prune`].
90#[derive(Debug)]
91#[cfg_attr(test, derive(Clone))]
92pub struct PruneInput {
93    pub(crate) previous_checkpoint: Option<PruneCheckpoint>,
94    /// Target block up to which the pruning needs to be done, inclusive.
95    pub(crate) to_block: BlockNumber,
96    /// Limits pruning of a segment.
97    pub(crate) limiter: PruneLimiter,
98}
99
100impl PruneInput {
101    /// Get next inclusive tx number range to prune according to the checkpoint and `to_block` block
102    /// number.
103    ///
104    /// To get the range start:
105    /// 1. If checkpoint exists, get next block body and return its first tx number.
106    /// 2. If checkpoint doesn't exist, return 0.
107    ///
108    /// To get the range end: get last tx number for `to_block`.
109    pub(crate) fn get_next_tx_num_range<Provider: BlockReader>(
110        &self,
111        provider: &Provider,
112    ) -> ProviderResult<Option<RangeInclusive<TxNumber>>> {
113        let from_tx_number = self.previous_checkpoint
114            // Checkpoint exists, prune from the next transaction after the highest pruned one
115            .and_then(|checkpoint| match checkpoint.tx_number {
116                Some(tx_number) => Some(tx_number + 1),
117                _ => {
118                    error!(target: "pruner", ?checkpoint, "Expected transaction number in prune checkpoint, found None");
119                    None
120                },
121            })
122            // No checkpoint exists, prune from genesis
123            .unwrap_or_default();
124
125        let to_tx_number = match provider.block_body_indices(self.to_block)? {
126            Some(body) => {
127                let last_tx = body.last_tx_num();
128                if last_tx + body.tx_count() == 0 {
129                    // Prevents a scenario where the pruner correctly starts at a finalized block,
130                    // but the first transaction (tx_num = 0) only appears on a non-finalized one.
131                    // Should only happen on a test/hive scenario.
132                    return Ok(None)
133                }
134                last_tx
135            }
136            None => return Ok(None),
137        };
138
139        let range = from_tx_number..=to_tx_number;
140        if range.is_empty() {
141            return Ok(None)
142        }
143
144        Ok(Some(range))
145    }
146
147    /// Get next inclusive block range to prune according to the checkpoint, `to_block` block
148    /// number and `limit`.
149    ///
150    /// To get the range start (`from_block`):
151    /// 1. If checkpoint exists, use next block.
152    /// 2. If checkpoint doesn't exist, use block 0.
153    ///
154    /// To get the range end: use block `to_block`.
155    pub(crate) fn get_next_block_range(&self) -> Option<RangeInclusive<BlockNumber>> {
156        let from_block = self.get_start_next_block_range();
157        let range = from_block..=self.to_block;
158        if range.is_empty() {
159            return None
160        }
161
162        Some(range)
163    }
164
165    /// Returns the start of the next block range.
166    ///
167    /// 1. If checkpoint exists, use next block.
168    /// 2. If checkpoint doesn't exist, use block 0.
169    pub(crate) fn get_start_next_block_range(&self) -> u64 {
170        self.previous_checkpoint
171            .and_then(|checkpoint| checkpoint.block_number)
172            // Checkpoint exists, prune from the next block after the highest pruned one
173            .map(|block_number| block_number + 1)
174            // No checkpoint exists, prune from genesis
175            .unwrap_or(0)
176    }
177}
178
179#[cfg(test)]
180mod tests {
181    use super::*;
182    use alloy_primitives::B256;
183    use reth_provider::{
184        providers::BlockchainProvider,
185        test_utils::{create_test_provider_factory, MockEthProvider},
186        BlockWriter,
187    };
188    use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
189
190    #[test]
191    fn test_prune_input_get_next_tx_num_range_no_to_block() {
192        let input = PruneInput {
193            previous_checkpoint: None,
194            to_block: 10,
195            limiter: PruneLimiter::default(),
196        };
197
198        // Default provider with no block corresponding to block 10
199        let provider = MockEthProvider::default();
200
201        // No block body for block 10, expected None
202        let range = input.get_next_tx_num_range(&provider).expect("Expected range");
203        assert!(range.is_none());
204    }
205
206    #[test]
207    fn test_prune_input_get_next_tx_num_range_no_tx() {
208        let input = PruneInput {
209            previous_checkpoint: None,
210            to_block: 10,
211            limiter: PruneLimiter::default(),
212        };
213
214        let mut rng = generators::rng();
215        let factory = create_test_provider_factory();
216
217        // Generate 10 random blocks with no transactions
218        let blocks = random_block_range(
219            &mut rng,
220            0..=10,
221            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
222        );
223
224        // Insert the blocks into the database
225        let provider_rw = factory.provider_rw().expect("failed to get provider_rw");
226        for block in &blocks {
227            provider_rw
228                .insert_block(
229                    block.clone().try_recover().expect("failed to seal block with senders"),
230                )
231                .expect("failed to insert block");
232        }
233        provider_rw.commit().expect("failed to commit");
234
235        // Create a new provider
236        let provider = BlockchainProvider::new(factory).unwrap();
237
238        // Since there are no transactions, expected None
239        let range = input.get_next_tx_num_range(&provider).expect("Expected range");
240        assert!(range.is_none());
241    }
242
243    #[test]
244    fn test_prune_input_get_next_tx_num_range_valid() {
245        // Create a new prune input
246        let input = PruneInput {
247            previous_checkpoint: None,
248            to_block: 10,
249            limiter: PruneLimiter::default(),
250        };
251
252        let mut rng = generators::rng();
253        let factory = create_test_provider_factory();
254
255        // Generate 10 random blocks with some transactions
256        let blocks = random_block_range(
257            &mut rng,
258            0..=10,
259            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..5, ..Default::default() },
260        );
261
262        // Insert the blocks into the database
263        let provider_rw = factory.provider_rw().expect("failed to get provider_rw");
264        for block in &blocks {
265            provider_rw
266                .insert_block(
267                    block.clone().try_recover().expect("failed to seal block with senders"),
268                )
269                .expect("failed to insert block");
270        }
271        provider_rw.commit().expect("failed to commit");
272
273        // Create a new provider
274        let provider = BlockchainProvider::new(factory).unwrap();
275
276        // Get the next tx number range
277        let range = input.get_next_tx_num_range(&provider).expect("Expected range").unwrap();
278
279        // Calculate the total number of transactions
280        let num_txs = blocks.iter().map(|block| block.transaction_count() as u64).sum::<u64>();
281
282        assert_eq!(range, 0..=num_txs - 1);
283    }
284
285    #[test]
286    fn test_prune_input_get_next_tx_checkpoint_without_tx_number() {
287        // Create a prune input with a previous checkpoint without a tx number (unexpected)
288        let input = PruneInput {
289            previous_checkpoint: Some(PruneCheckpoint {
290                block_number: Some(5),
291                tx_number: None,
292                prune_mode: PruneMode::Full,
293            }),
294            to_block: 10,
295            limiter: PruneLimiter::default(),
296        };
297
298        let mut rng = generators::rng();
299        let factory = create_test_provider_factory();
300
301        // Generate 10 random blocks
302        let blocks = random_block_range(
303            &mut rng,
304            0..=10,
305            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..5, ..Default::default() },
306        );
307
308        // Insert the blocks into the database
309        let provider_rw = factory.provider_rw().expect("failed to get provider_rw");
310        for block in &blocks {
311            provider_rw
312                .insert_block(
313                    block.clone().try_recover().expect("failed to seal block with senders"),
314                )
315                .expect("failed to insert block");
316        }
317        provider_rw.commit().expect("failed to commit");
318
319        // Create a new provider
320        let provider = BlockchainProvider::new(factory).unwrap();
321
322        // Fetch the range and check if it is correct
323        let range = input.get_next_tx_num_range(&provider).expect("Expected range").unwrap();
324
325        // Calculate the total number of transactions
326        let num_txs = blocks.iter().map(|block| block.transaction_count() as u64).sum::<u64>();
327
328        assert_eq!(range, 0..=num_txs - 1,);
329    }
330
331    #[test]
332    fn test_prune_input_get_next_tx_empty_range() {
333        // Create a new provider via factory
334        let mut rng = generators::rng();
335        let factory = create_test_provider_factory();
336
337        // Generate 10 random blocks
338        let blocks = random_block_range(
339            &mut rng,
340            0..=10,
341            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..5, ..Default::default() },
342        );
343
344        // Insert the blocks into the database
345        let provider_rw = factory.provider_rw().expect("failed to get provider_rw");
346        for block in &blocks {
347            provider_rw
348                .insert_block(
349                    block.clone().try_recover().expect("failed to seal block with senders"),
350                )
351                .expect("failed to insert block");
352        }
353        provider_rw.commit().expect("failed to commit");
354
355        // Create a new provider
356        let provider = BlockchainProvider::new(factory).unwrap();
357
358        // Get the last tx number
359        // Calculate the total number of transactions
360        let num_txs = blocks.iter().map(|block| block.transaction_count() as u64).sum::<u64>();
361        let max_range = num_txs - 1;
362
363        // Create a prune input with a previous checkpoint that is the last tx number
364        let input = PruneInput {
365            previous_checkpoint: Some(PruneCheckpoint {
366                block_number: Some(5),
367                tx_number: Some(max_range),
368                prune_mode: PruneMode::Full,
369            }),
370            to_block: 10,
371            limiter: PruneLimiter::default(),
372        };
373
374        // We expect an empty range since the previous checkpoint is the last tx number
375        let range = input.get_next_tx_num_range(&provider).expect("Expected range");
376        assert!(range.is_none());
377    }
378}