reth_prune/segments/
mod.rs

1mod receipts;
2mod set;
3mod static_file;
4mod user;
5
6use crate::{PruneLimiter, PrunerError};
7use alloy_primitives::{BlockNumber, TxNumber};
8use reth_provider::{errors::provider::ProviderResult, BlockReader, PruneCheckpointWriter};
9use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment, SegmentOutput};
10pub use set::SegmentSet;
11pub use static_file::{
12    Headers as StaticFileHeaders, Receipts as StaticFileReceipts,
13    Transactions as StaticFileTransactions,
14};
15use std::{fmt::Debug, ops::RangeInclusive};
16use tracing::error;
17pub use user::{
18    AccountHistory, Receipts as UserReceipts, ReceiptsByLogs, SenderRecovery, StorageHistory,
19    TransactionLookup,
20};
21
22/// A segment represents a pruning of some portion of the data.
23///
24/// Segments are called from [`Pruner`](crate::Pruner) with the following lifecycle:
25/// 1. Call [`Segment::prune`] with `delete_limit` of [`PruneInput`].
26/// 2. If [`Segment::prune`] returned a [`Some`] in `checkpoint` of [`SegmentOutput`], call
27///    [`Segment::save_checkpoint`].
28/// 3. Subtract `pruned` of [`SegmentOutput`] from `delete_limit` of next [`PruneInput`].
29pub trait Segment<Provider>: Debug + Send + Sync {
30    /// Segment of data that's pruned.
31    fn segment(&self) -> PruneSegment;
32
33    /// Prune mode with which the segment was initialized.
34    fn mode(&self) -> Option<PruneMode>;
35
36    /// Purpose of the segment.
37    fn purpose(&self) -> PrunePurpose;
38
39    /// Prune data for [`Self::segment`] using the provided input.
40    fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError>;
41
42    /// Save checkpoint for [`Self::segment`] to the database.
43    fn save_checkpoint(
44        &self,
45        provider: &Provider,
46        checkpoint: PruneCheckpoint,
47    ) -> ProviderResult<()>
48    where
49        Provider: PruneCheckpointWriter,
50    {
51        provider.save_prune_checkpoint(self.segment(), checkpoint)
52    }
53}
54
55/// Segment pruning input, see [`Segment::prune`].
56#[derive(Debug)]
57#[cfg_attr(test, derive(Clone))]
58pub struct PruneInput {
59    pub(crate) previous_checkpoint: Option<PruneCheckpoint>,
60    /// Target block up to which the pruning needs to be done, inclusive.
61    pub(crate) to_block: BlockNumber,
62    /// Limits pruning of a segment.
63    pub(crate) limiter: PruneLimiter,
64}
65
66impl PruneInput {
67    /// Get next inclusive tx number range to prune according to the checkpoint and `to_block` block
68    /// number.
69    ///
70    /// To get the range start:
71    /// 1. If checkpoint exists, get next block body and return its first tx number.
72    /// 2. If checkpoint doesn't exist, return 0.
73    ///
74    /// To get the range end: get last tx number for `to_block`.
75    pub(crate) fn get_next_tx_num_range<Provider: BlockReader>(
76        &self,
77        provider: &Provider,
78    ) -> ProviderResult<Option<RangeInclusive<TxNumber>>> {
79        let from_tx_number = self.previous_checkpoint
80            // Checkpoint exists, prune from the next transaction after the highest pruned one
81            .and_then(|checkpoint| match checkpoint.tx_number {
82                Some(tx_number) => Some(tx_number + 1),
83                _ => {
84                    error!(target: "pruner", ?checkpoint, "Expected transaction number in prune checkpoint, found None");
85                    None
86                },
87            })
88            // No checkpoint exists, prune from genesis
89            .unwrap_or_default();
90
91        let to_tx_number = match provider.block_body_indices(self.to_block)? {
92            Some(body) => {
93                let last_tx = body.last_tx_num();
94                if last_tx + body.tx_count() == 0 {
95                    // Prevents a scenario where the pruner correctly starts at a finalized block,
96                    // but the first transaction (tx_num = 0) only appears on a non-finalized one.
97                    // Should only happen on a test/hive scenario.
98                    return Ok(None)
99                }
100                last_tx
101            }
102            None => return Ok(None),
103        };
104
105        let range = from_tx_number..=to_tx_number;
106        if range.is_empty() {
107            return Ok(None)
108        }
109
110        Ok(Some(range))
111    }
112
113    /// Get next inclusive block range to prune according to the checkpoint, `to_block` block
114    /// number and `limit`.
115    ///
116    /// To get the range start (`from_block`):
117    /// 1. If checkpoint exists, use next block.
118    /// 2. If checkpoint doesn't exist, use block 0.
119    ///
120    /// To get the range end: use block `to_block`.
121    pub(crate) fn get_next_block_range(&self) -> Option<RangeInclusive<BlockNumber>> {
122        let from_block = self.get_start_next_block_range();
123        let range = from_block..=self.to_block;
124        if range.is_empty() {
125            return None
126        }
127
128        Some(range)
129    }
130
131    /// Returns the start of the next block range.
132    ///
133    /// 1. If checkpoint exists, use next block.
134    /// 2. If checkpoint doesn't exist, use block 0.
135    pub(crate) fn get_start_next_block_range(&self) -> u64 {
136        self.previous_checkpoint
137            .and_then(|checkpoint| checkpoint.block_number)
138            // Checkpoint exists, prune from the next block after the highest pruned one
139            .map(|block_number| block_number + 1)
140            // No checkpoint exists, prune from genesis
141            .unwrap_or(0)
142    }
143}
144
145#[cfg(test)]
146mod tests {
147    use super::*;
148    use alloy_primitives::B256;
149    use reth_provider::{
150        providers::BlockchainProvider,
151        test_utils::{create_test_provider_factory, MockEthProvider},
152        BlockWriter,
153    };
154    use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
155
156    #[test]
157    fn test_prune_input_get_next_tx_num_range_no_to_block() {
158        let input = PruneInput {
159            previous_checkpoint: None,
160            to_block: 10,
161            limiter: PruneLimiter::default(),
162        };
163
164        // Default provider with no block corresponding to block 10
165        let provider = MockEthProvider::default();
166
167        // No block body for block 10, expected None
168        let range = input.get_next_tx_num_range(&provider).expect("Expected range");
169        assert!(range.is_none());
170    }
171
172    #[test]
173    fn test_prune_input_get_next_tx_num_range_no_tx() {
174        let input = PruneInput {
175            previous_checkpoint: None,
176            to_block: 10,
177            limiter: PruneLimiter::default(),
178        };
179
180        let mut rng = generators::rng();
181        let factory = create_test_provider_factory();
182
183        // Generate 10 random blocks with no transactions
184        let blocks = random_block_range(
185            &mut rng,
186            0..=10,
187            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
188        );
189
190        // Insert the blocks into the database
191        let provider_rw = factory.provider_rw().expect("failed to get provider_rw");
192        for block in &blocks {
193            provider_rw
194                .insert_block(
195                    block.clone().try_recover().expect("failed to seal block with senders"),
196                )
197                .expect("failed to insert block");
198        }
199        provider_rw.commit().expect("failed to commit");
200
201        // Create a new provider
202        let provider = BlockchainProvider::new(factory).unwrap();
203
204        // Since there are no transactions, expected None
205        let range = input.get_next_tx_num_range(&provider).expect("Expected range");
206        assert!(range.is_none());
207    }
208
209    #[test]
210    fn test_prune_input_get_next_tx_num_range_valid() {
211        // Create a new prune input
212        let input = PruneInput {
213            previous_checkpoint: None,
214            to_block: 10,
215            limiter: PruneLimiter::default(),
216        };
217
218        let mut rng = generators::rng();
219        let factory = create_test_provider_factory();
220
221        // Generate 10 random blocks with some transactions
222        let blocks = random_block_range(
223            &mut rng,
224            0..=10,
225            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..5, ..Default::default() },
226        );
227
228        // Insert the blocks into the database
229        let provider_rw = factory.provider_rw().expect("failed to get provider_rw");
230        for block in &blocks {
231            provider_rw
232                .insert_block(
233                    block.clone().try_recover().expect("failed to seal block with senders"),
234                )
235                .expect("failed to insert block");
236        }
237        provider_rw.commit().expect("failed to commit");
238
239        // Create a new provider
240        let provider = BlockchainProvider::new(factory).unwrap();
241
242        // Get the next tx number range
243        let range = input.get_next_tx_num_range(&provider).expect("Expected range").unwrap();
244
245        // Calculate the total number of transactions
246        let num_txs = blocks.iter().map(|block| block.transaction_count() as u64).sum::<u64>();
247
248        assert_eq!(range, 0..=num_txs - 1);
249    }
250
251    #[test]
252    fn test_prune_input_get_next_tx_checkpoint_without_tx_number() {
253        // Create a prune input with a previous checkpoint without a tx number (unexpected)
254        let input = PruneInput {
255            previous_checkpoint: Some(PruneCheckpoint {
256                block_number: Some(5),
257                tx_number: None,
258                prune_mode: PruneMode::Full,
259            }),
260            to_block: 10,
261            limiter: PruneLimiter::default(),
262        };
263
264        let mut rng = generators::rng();
265        let factory = create_test_provider_factory();
266
267        // Generate 10 random blocks
268        let blocks = random_block_range(
269            &mut rng,
270            0..=10,
271            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..5, ..Default::default() },
272        );
273
274        // Insert the blocks into the database
275        let provider_rw = factory.provider_rw().expect("failed to get provider_rw");
276        for block in &blocks {
277            provider_rw
278                .insert_block(
279                    block.clone().try_recover().expect("failed to seal block with senders"),
280                )
281                .expect("failed to insert block");
282        }
283        provider_rw.commit().expect("failed to commit");
284
285        // Create a new provider
286        let provider = BlockchainProvider::new(factory).unwrap();
287
288        // Fetch the range and check if it is correct
289        let range = input.get_next_tx_num_range(&provider).expect("Expected range").unwrap();
290
291        // Calculate the total number of transactions
292        let num_txs = blocks.iter().map(|block| block.transaction_count() as u64).sum::<u64>();
293
294        assert_eq!(range, 0..=num_txs - 1,);
295    }
296
297    #[test]
298    fn test_prune_input_get_next_tx_empty_range() {
299        // Create a new provider via factory
300        let mut rng = generators::rng();
301        let factory = create_test_provider_factory();
302
303        // Generate 10 random blocks
304        let blocks = random_block_range(
305            &mut rng,
306            0..=10,
307            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..5, ..Default::default() },
308        );
309
310        // Insert the blocks into the database
311        let provider_rw = factory.provider_rw().expect("failed to get provider_rw");
312        for block in &blocks {
313            provider_rw
314                .insert_block(
315                    block.clone().try_recover().expect("failed to seal block with senders"),
316                )
317                .expect("failed to insert block");
318        }
319        provider_rw.commit().expect("failed to commit");
320
321        // Create a new provider
322        let provider = BlockchainProvider::new(factory).unwrap();
323
324        // Get the last tx number
325        // Calculate the total number of transactions
326        let num_txs = blocks.iter().map(|block| block.transaction_count() as u64).sum::<u64>();
327        let max_range = num_txs - 1;
328
329        // Create a prune input with a previous checkpoint that is the last tx number
330        let input = PruneInput {
331            previous_checkpoint: Some(PruneCheckpoint {
332                block_number: Some(5),
333                tx_number: Some(max_range),
334                prune_mode: PruneMode::Full,
335            }),
336            to_block: 10,
337            limiter: PruneLimiter::default(),
338        };
339
340        // We expect an empty range since the previous checkpoint is the last tx number
341        let range = input.get_next_tx_num_range(&provider).expect("Expected range");
342        assert!(range.is_none());
343    }
344}