reth_prune/segments/
mod.rs

1mod receipts;
2mod set;
3mod static_file;
4mod user;
5
6use crate::{PruneLimiter, PrunerError};
7use alloy_primitives::{BlockNumber, TxNumber};
8use reth_provider::{errors::provider::ProviderResult, BlockReader, PruneCheckpointWriter};
9use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment, SegmentOutput};
10pub use set::SegmentSet;
11pub use static_file::{
12    Headers as StaticFileHeaders, Receipts as StaticFileReceipts,
13    Transactions as StaticFileTransactions,
14};
15use std::{fmt::Debug, ops::RangeInclusive};
16use tracing::error;
17pub use user::{
18    AccountHistory, Receipts as UserReceipts, ReceiptsByLogs, SenderRecovery, StorageHistory,
19    TransactionLookup,
20};
21
22/// A segment represents a pruning of some portion of the data.
23///
24/// Segments are called from [`Pruner`](crate::Pruner) with the following lifecycle:
25/// 1. Call [`Segment::prune`] with `delete_limit` of [`PruneInput`].
26/// 2. If [`Segment::prune`] returned a [`Some`] in `checkpoint` of [`SegmentOutput`], call
27///    [`Segment::save_checkpoint`].
28/// 3. Subtract `pruned` of [`SegmentOutput`] from `delete_limit` of next [`PruneInput`].
29pub trait Segment<Provider>: Debug + Send + Sync {
30    /// Segment of data that's pruned.
31    fn segment(&self) -> PruneSegment;
32
33    /// Prune mode with which the segment was initialized.
34    fn mode(&self) -> Option<PruneMode>;
35
36    /// Purpose of the segment.
37    fn purpose(&self) -> PrunePurpose;
38
39    /// Prune data for [`Self::segment`] using the provided input.
40    fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError>;
41
42    /// Save checkpoint for [`Self::segment`] to the database.
43    fn save_checkpoint(
44        &self,
45        provider: &Provider,
46        checkpoint: PruneCheckpoint,
47    ) -> ProviderResult<()>
48    where
49        Provider: PruneCheckpointWriter,
50    {
51        provider.save_prune_checkpoint(self.segment(), checkpoint)
52    }
53}
54
55/// Segment pruning input, see [`Segment::prune`].
56#[derive(Debug)]
57#[cfg_attr(test, derive(Clone))]
58pub struct PruneInput {
59    pub(crate) previous_checkpoint: Option<PruneCheckpoint>,
60    /// Target block up to which the pruning needs to be done, inclusive.
61    pub(crate) to_block: BlockNumber,
62    /// Limits pruning of a segment.
63    pub(crate) limiter: PruneLimiter,
64}
65
66impl PruneInput {
67    /// Get next inclusive tx number range to prune according to the checkpoint and `to_block` block
68    /// number.
69    ///
70    /// To get the range start:
71    /// 1. If checkpoint exists, get next block body and return its first tx number.
72    /// 2. If checkpoint doesn't exist, return 0.
73    ///
74    /// To get the range end: get last tx number for `to_block`.
75    pub(crate) fn get_next_tx_num_range<Provider: BlockReader>(
76        &self,
77        provider: &Provider,
78    ) -> ProviderResult<Option<RangeInclusive<TxNumber>>> {
79        let from_tx_number = self.previous_checkpoint
80            // Checkpoint exists, prune from the next transaction after the highest pruned one
81            .and_then(|checkpoint| match checkpoint.tx_number {
82                Some(tx_number) => Some(tx_number + 1),
83                _ => {
84                    error!(target: "pruner", ?checkpoint, "Expected transaction number in prune checkpoint, found None");
85                    None
86                },
87            })
88            // No checkpoint exists, prune from genesis
89            .unwrap_or_default();
90
91        let to_tx_number = match provider.block_body_indices(self.to_block)? {
92            Some(body) => {
93                let last_tx = body.last_tx_num();
94                if last_tx + body.tx_count() == 0 {
95                    // Prevents a scenario where the pruner correctly starts at a finalized block,
96                    // but the first transaction (tx_num = 0) only appears on a non-finalized one.
97                    // Should only happen on a test/hive scenario.
98                    return Ok(None)
99                }
100                last_tx
101            }
102            None => return Ok(None),
103        };
104
105        let range = from_tx_number..=to_tx_number;
106        if range.is_empty() {
107            return Ok(None)
108        }
109
110        Ok(Some(range))
111    }
112
113    /// Get next inclusive block range to prune according to the checkpoint, `to_block` block
114    /// number and `limit`.
115    ///
116    /// To get the range start (`from_block`):
117    /// 1. If checkpoint exists, use next block.
118    /// 2. If checkpoint doesn't exist, use block 0.
119    ///
120    /// To get the range end: use block `to_block`.
121    pub(crate) fn get_next_block_range(&self) -> Option<RangeInclusive<BlockNumber>> {
122        let from_block = self.get_start_next_block_range();
123        let range = from_block..=self.to_block;
124        if range.is_empty() {
125            return None
126        }
127
128        Some(range)
129    }
130
131    /// Returns the start of the next block range.
132    ///
133    /// 1. If checkpoint exists, use next block.
134    /// 2. If checkpoint doesn't exist, use block 0.
135    pub(crate) fn get_start_next_block_range(&self) -> u64 {
136        self.previous_checkpoint
137            .and_then(|checkpoint| checkpoint.block_number)
138            // Checkpoint exists, prune from the next block after the highest pruned one
139            .map(|block_number| block_number + 1)
140            // No checkpoint exists, prune from genesis
141            .unwrap_or(0)
142    }
143}
144
145#[cfg(test)]
146mod tests {
147    use super::*;
148    use alloy_primitives::B256;
149    use reth_provider::{
150        providers::BlockchainProvider,
151        test_utils::{create_test_provider_factory, MockEthProvider},
152    };
153    use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
154
155    #[test]
156    fn test_prune_input_get_next_tx_num_range_no_to_block() {
157        let input = PruneInput {
158            previous_checkpoint: None,
159            to_block: 10,
160            limiter: PruneLimiter::default(),
161        };
162
163        // Default provider with no block corresponding to block 10
164        let provider = MockEthProvider::default();
165
166        // No block body for block 10, expected None
167        let range = input.get_next_tx_num_range(&provider).expect("Expected range");
168        assert!(range.is_none());
169    }
170
171    #[test]
172    fn test_prune_input_get_next_tx_num_range_no_tx() {
173        let input = PruneInput {
174            previous_checkpoint: None,
175            to_block: 10,
176            limiter: PruneLimiter::default(),
177        };
178
179        let mut rng = generators::rng();
180        let factory = create_test_provider_factory();
181
182        // Generate 10 random blocks with no transactions
183        let blocks = random_block_range(
184            &mut rng,
185            0..=10,
186            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
187        );
188
189        // Insert the blocks into the database
190        let provider_rw = factory.provider_rw().expect("failed to get provider_rw");
191        for block in &blocks {
192            provider_rw
193                .insert_historical_block(
194                    block.clone().try_recover().expect("failed to seal block with senders"),
195                )
196                .expect("failed to insert block");
197        }
198        provider_rw.commit().expect("failed to commit");
199
200        // Create a new provider
201        let provider = BlockchainProvider::new(factory).unwrap();
202
203        // Since there are no transactions, expected None
204        let range = input.get_next_tx_num_range(&provider).expect("Expected range");
205        assert!(range.is_none());
206    }
207
208    #[test]
209    fn test_prune_input_get_next_tx_num_range_valid() {
210        // Create a new prune input
211        let input = PruneInput {
212            previous_checkpoint: None,
213            to_block: 10,
214            limiter: PruneLimiter::default(),
215        };
216
217        let mut rng = generators::rng();
218        let factory = create_test_provider_factory();
219
220        // Generate 10 random blocks with some transactions
221        let blocks = random_block_range(
222            &mut rng,
223            0..=10,
224            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..5, ..Default::default() },
225        );
226
227        // Insert the blocks into the database
228        let provider_rw = factory.provider_rw().expect("failed to get provider_rw");
229        for block in &blocks {
230            provider_rw
231                .insert_historical_block(
232                    block.clone().try_recover().expect("failed to seal block with senders"),
233                )
234                .expect("failed to insert block");
235        }
236        provider_rw.commit().expect("failed to commit");
237
238        // Create a new provider
239        let provider = BlockchainProvider::new(factory).unwrap();
240
241        // Get the next tx number range
242        let range = input.get_next_tx_num_range(&provider).expect("Expected range").unwrap();
243
244        // Calculate the total number of transactions
245        let num_txs = blocks.iter().map(|block| block.transaction_count() as u64).sum::<u64>();
246
247        assert_eq!(range, 0..=num_txs - 1);
248    }
249
250    #[test]
251    fn test_prune_input_get_next_tx_checkpoint_without_tx_number() {
252        // Create a prune input with a previous checkpoint without a tx number (unexpected)
253        let input = PruneInput {
254            previous_checkpoint: Some(PruneCheckpoint {
255                block_number: Some(5),
256                tx_number: None,
257                prune_mode: PruneMode::Full,
258            }),
259            to_block: 10,
260            limiter: PruneLimiter::default(),
261        };
262
263        let mut rng = generators::rng();
264        let factory = create_test_provider_factory();
265
266        // Generate 10 random blocks
267        let blocks = random_block_range(
268            &mut rng,
269            0..=10,
270            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..5, ..Default::default() },
271        );
272
273        // Insert the blocks into the database
274        let provider_rw = factory.provider_rw().expect("failed to get provider_rw");
275        for block in &blocks {
276            provider_rw
277                .insert_historical_block(
278                    block.clone().try_recover().expect("failed to seal block with senders"),
279                )
280                .expect("failed to insert block");
281        }
282        provider_rw.commit().expect("failed to commit");
283
284        // Create a new provider
285        let provider = BlockchainProvider::new(factory).unwrap();
286
287        // Fetch the range and check if it is correct
288        let range = input.get_next_tx_num_range(&provider).expect("Expected range").unwrap();
289
290        // Calculate the total number of transactions
291        let num_txs = blocks.iter().map(|block| block.transaction_count() as u64).sum::<u64>();
292
293        assert_eq!(range, 0..=num_txs - 1,);
294    }
295
296    #[test]
297    fn test_prune_input_get_next_tx_empty_range() {
298        // Create a new provider via factory
299        let mut rng = generators::rng();
300        let factory = create_test_provider_factory();
301
302        // Generate 10 random blocks
303        let blocks = random_block_range(
304            &mut rng,
305            0..=10,
306            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..5, ..Default::default() },
307        );
308
309        // Insert the blocks into the database
310        let provider_rw = factory.provider_rw().expect("failed to get provider_rw");
311        for block in &blocks {
312            provider_rw
313                .insert_historical_block(
314                    block.clone().try_recover().expect("failed to seal block with senders"),
315                )
316                .expect("failed to insert block");
317        }
318        provider_rw.commit().expect("failed to commit");
319
320        // Create a new provider
321        let provider = BlockchainProvider::new(factory).unwrap();
322
323        // Get the last tx number
324        // Calculate the total number of transactions
325        let num_txs = blocks.iter().map(|block| block.transaction_count() as u64).sum::<u64>();
326        let max_range = num_txs - 1;
327
328        // Create a prune input with a previous checkpoint that is the last tx number
329        let input = PruneInput {
330            previous_checkpoint: Some(PruneCheckpoint {
331                block_number: Some(5),
332                tx_number: Some(max_range),
333                prune_mode: PruneMode::Full,
334            }),
335            to_block: 10,
336            limiter: PruneLimiter::default(),
337        };
338
339        // We expect an empty range since the previous checkpoint is the last tx number
340        let range = input.get_next_tx_num_range(&provider).expect("Expected range");
341        assert!(range.is_none());
342    }
343}