reth_prune/segments/
mod.rs

1mod receipts;
2mod set;
3mod user;
4
5use crate::{PruneLimiter, PrunerError};
6use alloy_primitives::{BlockNumber, TxNumber};
7use reth_provider::{
8    errors::provider::ProviderResult, BlockReader, PruneCheckpointWriter, StaticFileProviderFactory,
9};
10use reth_prune_types::{
11    PruneCheckpoint, PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutput,
12    SegmentOutputCheckpoint,
13};
14use reth_stages_types::StageId;
15use reth_static_file_types::StaticFileSegment;
16pub use set::SegmentSet;
17use std::{fmt::Debug, ops::RangeInclusive};
18use tracing::error;
19pub use user::{
20    AccountHistory, Bodies, MerkleChangeSets, Receipts as UserReceipts, ReceiptsByLogs,
21    SenderRecovery, StorageHistory, TransactionLookup,
22};
23
24/// Prunes data from static files for a given segment.
25///
26/// This is a generic helper function used by both receipts and bodies pruning
27/// when data is stored in static files.
28pub(crate) fn prune_static_files<Provider>(
29    provider: &Provider,
30    input: PruneInput,
31    segment: StaticFileSegment,
32) -> Result<SegmentOutput, PrunerError>
33where
34    Provider: StaticFileProviderFactory,
35{
36    let deleted_headers =
37        provider.static_file_provider().delete_segment_below_block(segment, input.to_block + 1)?;
38
39    if deleted_headers.is_empty() {
40        return Ok(SegmentOutput::done())
41    }
42
43    let tx_ranges = deleted_headers.iter().filter_map(|header| header.tx_range());
44
45    let pruned = tx_ranges.clone().map(|range| range.len()).sum::<u64>() as usize;
46
47    Ok(SegmentOutput {
48        progress: PruneProgress::Finished,
49        pruned,
50        checkpoint: Some(SegmentOutputCheckpoint {
51            block_number: Some(input.to_block),
52            tx_number: tx_ranges.map(|range| range.end()).max(),
53        }),
54    })
55}
56
57/// A segment represents a pruning of some portion of the data.
58///
59/// Segments are called from [`Pruner`](crate::Pruner) with the following lifecycle:
60/// 1. Call [`Segment::prune`] with `delete_limit` of [`PruneInput`].
61/// 2. If [`Segment::prune`] returned a [`Some`] in `checkpoint` of [`SegmentOutput`], call
62///    [`Segment::save_checkpoint`].
63/// 3. Subtract `pruned` of [`SegmentOutput`] from `delete_limit` of next [`PruneInput`].
64pub trait Segment<Provider>: Debug + Send + Sync {
65    /// Segment of data that's pruned.
66    fn segment(&self) -> PruneSegment;
67
68    /// Prune mode with which the segment was initialized.
69    fn mode(&self) -> Option<PruneMode>;
70
71    /// Purpose of the segment.
72    fn purpose(&self) -> PrunePurpose;
73
74    /// Prune data for [`Self::segment`] using the provided input.
75    fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError>;
76
77    /// Save checkpoint for [`Self::segment`] to the database.
78    fn save_checkpoint(
79        &self,
80        provider: &Provider,
81        checkpoint: PruneCheckpoint,
82    ) -> ProviderResult<()>
83    where
84        Provider: PruneCheckpointWriter,
85    {
86        provider.save_prune_checkpoint(self.segment(), checkpoint)
87    }
88
89    /// Returns the stage this segment depends on, if any.
90    ///
91    /// If this returns `Some(stage_id)`, the pruner will skip this segment if the stage
92    /// has not yet caught up with the `Finish` stage checkpoint.
93    fn required_stage(&self) -> Option<StageId> {
94        None
95    }
96}
97
98/// Segment pruning input, see [`Segment::prune`].
99#[derive(Debug)]
100#[cfg_attr(test, derive(Clone))]
101pub struct PruneInput {
102    pub(crate) previous_checkpoint: Option<PruneCheckpoint>,
103    /// Target block up to which the pruning needs to be done, inclusive.
104    pub(crate) to_block: BlockNumber,
105    /// Limits pruning of a segment.
106    pub(crate) limiter: PruneLimiter,
107}
108
109impl PruneInput {
110    /// Get next inclusive tx number range to prune according to the checkpoint and `to_block` block
111    /// number.
112    ///
113    /// To get the range start:
114    /// 1. If checkpoint exists, get next block body and return its first tx number.
115    /// 2. If checkpoint doesn't exist, return 0.
116    ///
117    /// To get the range end: get last tx number for `to_block`.
118    pub(crate) fn get_next_tx_num_range<Provider: BlockReader>(
119        &self,
120        provider: &Provider,
121    ) -> ProviderResult<Option<RangeInclusive<TxNumber>>> {
122        let from_tx_number = self.previous_checkpoint
123            // Checkpoint exists, prune from the next transaction after the highest pruned one
124            .and_then(|checkpoint| match checkpoint.tx_number {
125                Some(tx_number) => Some(tx_number + 1),
126                _ => {
127                    error!(target: "pruner", ?checkpoint, "Expected transaction number in prune checkpoint, found None");
128                    None
129                },
130            })
131            // No checkpoint exists, prune from genesis
132            .unwrap_or_default();
133
134        let to_tx_number = match provider.block_body_indices(self.to_block)? {
135            Some(body) => {
136                let last_tx = body.last_tx_num();
137                if last_tx + body.tx_count() == 0 {
138                    // Prevents a scenario where the pruner correctly starts at a finalized block,
139                    // but the first transaction (tx_num = 0) only appears on a non-finalized one.
140                    // Should only happen on a test/hive scenario.
141                    return Ok(None)
142                }
143                last_tx
144            }
145            None => return Ok(None),
146        };
147
148        let range = from_tx_number..=to_tx_number;
149        if range.is_empty() {
150            return Ok(None)
151        }
152
153        Ok(Some(range))
154    }
155
156    /// Get next inclusive block range to prune according to the checkpoint, `to_block` block
157    /// number and `limit`.
158    ///
159    /// To get the range start (`from_block`):
160    /// 1. If checkpoint exists, use next block.
161    /// 2. If checkpoint doesn't exist, use block 0.
162    ///
163    /// To get the range end: use block `to_block`.
164    pub(crate) fn get_next_block_range(&self) -> Option<RangeInclusive<BlockNumber>> {
165        let from_block = self.get_start_next_block_range();
166        let range = from_block..=self.to_block;
167        if range.is_empty() {
168            return None
169        }
170
171        Some(range)
172    }
173
174    /// Returns the start of the next block range.
175    ///
176    /// 1. If checkpoint exists, use next block.
177    /// 2. If checkpoint doesn't exist, use block 0.
178    pub(crate) fn get_start_next_block_range(&self) -> u64 {
179        self.previous_checkpoint
180            .and_then(|checkpoint| checkpoint.block_number)
181            // Checkpoint exists, prune from the next block after the highest pruned one
182            .map(|block_number| block_number + 1)
183            // No checkpoint exists, prune from genesis
184            .unwrap_or(0)
185    }
186}
187
188#[cfg(test)]
189mod tests {
190    use super::*;
191    use alloy_primitives::B256;
192    use reth_provider::{
193        providers::BlockchainProvider,
194        test_utils::{create_test_provider_factory, MockEthProvider},
195        BlockWriter,
196    };
197    use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
198
199    #[test]
200    fn test_prune_input_get_next_tx_num_range_no_to_block() {
201        let input = PruneInput {
202            previous_checkpoint: None,
203            to_block: 10,
204            limiter: PruneLimiter::default(),
205        };
206
207        // Default provider with no block corresponding to block 10
208        let provider = MockEthProvider::default();
209
210        // No block body for block 10, expected None
211        let range = input.get_next_tx_num_range(&provider).expect("Expected range");
212        assert!(range.is_none());
213    }
214
215    #[test]
216    fn test_prune_input_get_next_tx_num_range_no_tx() {
217        let input = PruneInput {
218            previous_checkpoint: None,
219            to_block: 10,
220            limiter: PruneLimiter::default(),
221        };
222
223        let mut rng = generators::rng();
224        let factory = create_test_provider_factory();
225
226        // Generate 10 random blocks with no transactions
227        let blocks = random_block_range(
228            &mut rng,
229            0..=10,
230            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
231        );
232
233        // Insert the blocks into the database
234        let provider_rw = factory.provider_rw().expect("failed to get provider_rw");
235        for block in &blocks {
236            provider_rw
237                .insert_block(
238                    block.clone().try_recover().expect("failed to seal block with senders"),
239                )
240                .expect("failed to insert block");
241        }
242        provider_rw.commit().expect("failed to commit");
243
244        // Create a new provider
245        let provider = BlockchainProvider::new(factory).unwrap();
246
247        // Since there are no transactions, expected None
248        let range = input.get_next_tx_num_range(&provider).expect("Expected range");
249        assert!(range.is_none());
250    }
251
252    #[test]
253    fn test_prune_input_get_next_tx_num_range_valid() {
254        // Create a new prune input
255        let input = PruneInput {
256            previous_checkpoint: None,
257            to_block: 10,
258            limiter: PruneLimiter::default(),
259        };
260
261        let mut rng = generators::rng();
262        let factory = create_test_provider_factory();
263
264        // Generate 10 random blocks with some transactions
265        let blocks = random_block_range(
266            &mut rng,
267            0..=10,
268            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..5, ..Default::default() },
269        );
270
271        // Insert the blocks into the database
272        let provider_rw = factory.provider_rw().expect("failed to get provider_rw");
273        for block in &blocks {
274            provider_rw
275                .insert_block(
276                    block.clone().try_recover().expect("failed to seal block with senders"),
277                )
278                .expect("failed to insert block");
279        }
280        provider_rw.commit().expect("failed to commit");
281
282        // Create a new provider
283        let provider = BlockchainProvider::new(factory).unwrap();
284
285        // Get the next tx number range
286        let range = input.get_next_tx_num_range(&provider).expect("Expected range").unwrap();
287
288        // Calculate the total number of transactions
289        let num_txs = blocks.iter().map(|block| block.transaction_count() as u64).sum::<u64>();
290
291        assert_eq!(range, 0..=num_txs - 1);
292    }
293
294    #[test]
295    fn test_prune_input_get_next_tx_checkpoint_without_tx_number() {
296        // Create a prune input with a previous checkpoint without a tx number (unexpected)
297        let input = PruneInput {
298            previous_checkpoint: Some(PruneCheckpoint {
299                block_number: Some(5),
300                tx_number: None,
301                prune_mode: PruneMode::Full,
302            }),
303            to_block: 10,
304            limiter: PruneLimiter::default(),
305        };
306
307        let mut rng = generators::rng();
308        let factory = create_test_provider_factory();
309
310        // Generate 10 random blocks
311        let blocks = random_block_range(
312            &mut rng,
313            0..=10,
314            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..5, ..Default::default() },
315        );
316
317        // Insert the blocks into the database
318        let provider_rw = factory.provider_rw().expect("failed to get provider_rw");
319        for block in &blocks {
320            provider_rw
321                .insert_block(
322                    block.clone().try_recover().expect("failed to seal block with senders"),
323                )
324                .expect("failed to insert block");
325        }
326        provider_rw.commit().expect("failed to commit");
327
328        // Create a new provider
329        let provider = BlockchainProvider::new(factory).unwrap();
330
331        // Fetch the range and check if it is correct
332        let range = input.get_next_tx_num_range(&provider).expect("Expected range").unwrap();
333
334        // Calculate the total number of transactions
335        let num_txs = blocks.iter().map(|block| block.transaction_count() as u64).sum::<u64>();
336
337        assert_eq!(range, 0..=num_txs - 1,);
338    }
339
340    #[test]
341    fn test_prune_input_get_next_tx_empty_range() {
342        // Create a new provider via factory
343        let mut rng = generators::rng();
344        let factory = create_test_provider_factory();
345
346        // Generate 10 random blocks
347        let blocks = random_block_range(
348            &mut rng,
349            0..=10,
350            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..5, ..Default::default() },
351        );
352
353        // Insert the blocks into the database
354        let provider_rw = factory.provider_rw().expect("failed to get provider_rw");
355        for block in &blocks {
356            provider_rw
357                .insert_block(
358                    block.clone().try_recover().expect("failed to seal block with senders"),
359                )
360                .expect("failed to insert block");
361        }
362        provider_rw.commit().expect("failed to commit");
363
364        // Create a new provider
365        let provider = BlockchainProvider::new(factory).unwrap();
366
367        // Get the last tx number
368        // Calculate the total number of transactions
369        let num_txs = blocks.iter().map(|block| block.transaction_count() as u64).sum::<u64>();
370        let max_range = num_txs - 1;
371
372        // Create a prune input with a previous checkpoint that is the last tx number
373        let input = PruneInput {
374            previous_checkpoint: Some(PruneCheckpoint {
375                block_number: Some(5),
376                tx_number: Some(max_range),
377                prune_mode: PruneMode::Full,
378            }),
379            to_block: 10,
380            limiter: PruneLimiter::default(),
381        };
382
383        // We expect an empty range since the previous checkpoint is the last tx number
384        let range = input.get_next_tx_num_range(&provider).expect("Expected range");
385        assert!(range.is_none());
386    }
387}