reth_prune/segments/
mod.rs

1mod receipts;
2mod set;
3mod user;
4
5use crate::{PruneLimiter, PrunerError};
6use alloy_primitives::{BlockNumber, TxNumber};
7use reth_provider::{errors::provider::ProviderResult, BlockReader, PruneCheckpointWriter};
8use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment, SegmentOutput};
9pub use set::SegmentSet;
10use std::{fmt::Debug, ops::RangeInclusive};
11use tracing::error;
12pub use user::{
13    AccountHistory, Bodies, MerkleChangeSets, Receipts as UserReceipts, SenderRecovery,
14    StorageHistory, TransactionLookup,
15};
16
17/// A segment represents a pruning of some portion of the data.
18///
19/// Segments are called from [`Pruner`](crate::Pruner) with the following lifecycle:
20/// 1. Call [`Segment::prune`] with `delete_limit` of [`PruneInput`].
21/// 2. If [`Segment::prune`] returned a [`Some`] in `checkpoint` of [`SegmentOutput`], call
22///    [`Segment::save_checkpoint`].
23/// 3. Subtract `pruned` of [`SegmentOutput`] from `delete_limit` of next [`PruneInput`].
24pub trait Segment<Provider>: Debug + Send + Sync {
25    /// Segment of data that's pruned.
26    fn segment(&self) -> PruneSegment;
27
28    /// Prune mode with which the segment was initialized.
29    fn mode(&self) -> Option<PruneMode>;
30
31    /// Purpose of the segment.
32    fn purpose(&self) -> PrunePurpose;
33
34    /// Prune data for [`Self::segment`] using the provided input.
35    fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError>;
36
37    /// Save checkpoint for [`Self::segment`] to the database.
38    fn save_checkpoint(
39        &self,
40        provider: &Provider,
41        checkpoint: PruneCheckpoint,
42    ) -> ProviderResult<()>
43    where
44        Provider: PruneCheckpointWriter,
45    {
46        provider.save_prune_checkpoint(self.segment(), checkpoint)
47    }
48}
49
50/// Segment pruning input, see [`Segment::prune`].
51#[derive(Debug)]
52#[cfg_attr(test, derive(Clone))]
53pub struct PruneInput {
54    pub(crate) previous_checkpoint: Option<PruneCheckpoint>,
55    /// Target block up to which the pruning needs to be done, inclusive.
56    pub(crate) to_block: BlockNumber,
57    /// Limits pruning of a segment.
58    pub(crate) limiter: PruneLimiter,
59}
60
61impl PruneInput {
62    /// Get next inclusive tx number range to prune according to the checkpoint and `to_block` block
63    /// number.
64    ///
65    /// To get the range start:
66    /// 1. If checkpoint exists, get next block body and return its first tx number.
67    /// 2. If checkpoint doesn't exist, return 0.
68    ///
69    /// To get the range end: get last tx number for `to_block`.
70    pub(crate) fn get_next_tx_num_range<Provider: BlockReader>(
71        &self,
72        provider: &Provider,
73    ) -> ProviderResult<Option<RangeInclusive<TxNumber>>> {
74        let from_tx_number = self.previous_checkpoint
75            // Checkpoint exists, prune from the next transaction after the highest pruned one
76            .and_then(|checkpoint| match checkpoint.tx_number {
77                Some(tx_number) => Some(tx_number + 1),
78                _ => {
79                    error!(target: "pruner", ?checkpoint, "Expected transaction number in prune checkpoint, found None");
80                    None
81                },
82            })
83            // No checkpoint exists, prune from genesis
84            .unwrap_or_default();
85
86        let to_tx_number = match provider.block_body_indices(self.to_block)? {
87            Some(body) => {
88                let last_tx = body.last_tx_num();
89                if last_tx + body.tx_count() == 0 {
90                    // Prevents a scenario where the pruner correctly starts at a finalized block,
91                    // but the first transaction (tx_num = 0) only appears on a non-finalized one.
92                    // Should only happen on a test/hive scenario.
93                    return Ok(None)
94                }
95                last_tx
96            }
97            None => return Ok(None),
98        };
99
100        let range = from_tx_number..=to_tx_number;
101        if range.is_empty() {
102            return Ok(None)
103        }
104
105        Ok(Some(range))
106    }
107
108    /// Get next inclusive block range to prune according to the checkpoint, `to_block` block
109    /// number and `limit`.
110    ///
111    /// To get the range start (`from_block`):
112    /// 1. If checkpoint exists, use next block.
113    /// 2. If checkpoint doesn't exist, use block 0.
114    ///
115    /// To get the range end: use block `to_block`.
116    pub(crate) fn get_next_block_range(&self) -> Option<RangeInclusive<BlockNumber>> {
117        let from_block = self.get_start_next_block_range();
118        let range = from_block..=self.to_block;
119        if range.is_empty() {
120            return None
121        }
122
123        Some(range)
124    }
125
126    /// Returns the start of the next block range.
127    ///
128    /// 1. If checkpoint exists, use next block.
129    /// 2. If checkpoint doesn't exist, use block 0.
130    pub(crate) fn get_start_next_block_range(&self) -> u64 {
131        self.previous_checkpoint
132            .and_then(|checkpoint| checkpoint.block_number)
133            // Checkpoint exists, prune from the next block after the highest pruned one
134            .map(|block_number| block_number + 1)
135            // No checkpoint exists, prune from genesis
136            .unwrap_or(0)
137    }
138}
139
140#[cfg(test)]
141mod tests {
142    use super::*;
143    use alloy_primitives::B256;
144    use reth_provider::{
145        providers::BlockchainProvider,
146        test_utils::{create_test_provider_factory, MockEthProvider},
147        BlockWriter,
148    };
149    use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
150
151    #[test]
152    fn test_prune_input_get_next_tx_num_range_no_to_block() {
153        let input = PruneInput {
154            previous_checkpoint: None,
155            to_block: 10,
156            limiter: PruneLimiter::default(),
157        };
158
159        // Default provider with no block corresponding to block 10
160        let provider = MockEthProvider::default();
161
162        // No block body for block 10, expected None
163        let range = input.get_next_tx_num_range(&provider).expect("Expected range");
164        assert!(range.is_none());
165    }
166
167    #[test]
168    fn test_prune_input_get_next_tx_num_range_no_tx() {
169        let input = PruneInput {
170            previous_checkpoint: None,
171            to_block: 10,
172            limiter: PruneLimiter::default(),
173        };
174
175        let mut rng = generators::rng();
176        let factory = create_test_provider_factory();
177
178        // Generate 10 random blocks with no transactions
179        let blocks = random_block_range(
180            &mut rng,
181            0..=10,
182            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
183        );
184
185        // Insert the blocks into the database
186        let provider_rw = factory.provider_rw().expect("failed to get provider_rw");
187        for block in &blocks {
188            provider_rw
189                .insert_block(
190                    block.clone().try_recover().expect("failed to seal block with senders"),
191                )
192                .expect("failed to insert block");
193        }
194        provider_rw.commit().expect("failed to commit");
195
196        // Create a new provider
197        let provider = BlockchainProvider::new(factory).unwrap();
198
199        // Since there are no transactions, expected None
200        let range = input.get_next_tx_num_range(&provider).expect("Expected range");
201        assert!(range.is_none());
202    }
203
204    #[test]
205    fn test_prune_input_get_next_tx_num_range_valid() {
206        // Create a new prune input
207        let input = PruneInput {
208            previous_checkpoint: None,
209            to_block: 10,
210            limiter: PruneLimiter::default(),
211        };
212
213        let mut rng = generators::rng();
214        let factory = create_test_provider_factory();
215
216        // Generate 10 random blocks with some transactions
217        let blocks = random_block_range(
218            &mut rng,
219            0..=10,
220            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..5, ..Default::default() },
221        );
222
223        // Insert the blocks into the database
224        let provider_rw = factory.provider_rw().expect("failed to get provider_rw");
225        for block in &blocks {
226            provider_rw
227                .insert_block(
228                    block.clone().try_recover().expect("failed to seal block with senders"),
229                )
230                .expect("failed to insert block");
231        }
232        provider_rw.commit().expect("failed to commit");
233
234        // Create a new provider
235        let provider = BlockchainProvider::new(factory).unwrap();
236
237        // Get the next tx number range
238        let range = input.get_next_tx_num_range(&provider).expect("Expected range").unwrap();
239
240        // Calculate the total number of transactions
241        let num_txs = blocks.iter().map(|block| block.transaction_count() as u64).sum::<u64>();
242
243        assert_eq!(range, 0..=num_txs - 1);
244    }
245
246    #[test]
247    fn test_prune_input_get_next_tx_checkpoint_without_tx_number() {
248        // Create a prune input with a previous checkpoint without a tx number (unexpected)
249        let input = PruneInput {
250            previous_checkpoint: Some(PruneCheckpoint {
251                block_number: Some(5),
252                tx_number: None,
253                prune_mode: PruneMode::Full,
254            }),
255            to_block: 10,
256            limiter: PruneLimiter::default(),
257        };
258
259        let mut rng = generators::rng();
260        let factory = create_test_provider_factory();
261
262        // Generate 10 random blocks
263        let blocks = random_block_range(
264            &mut rng,
265            0..=10,
266            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..5, ..Default::default() },
267        );
268
269        // Insert the blocks into the database
270        let provider_rw = factory.provider_rw().expect("failed to get provider_rw");
271        for block in &blocks {
272            provider_rw
273                .insert_block(
274                    block.clone().try_recover().expect("failed to seal block with senders"),
275                )
276                .expect("failed to insert block");
277        }
278        provider_rw.commit().expect("failed to commit");
279
280        // Create a new provider
281        let provider = BlockchainProvider::new(factory).unwrap();
282
283        // Fetch the range and check if it is correct
284        let range = input.get_next_tx_num_range(&provider).expect("Expected range").unwrap();
285
286        // Calculate the total number of transactions
287        let num_txs = blocks.iter().map(|block| block.transaction_count() as u64).sum::<u64>();
288
289        assert_eq!(range, 0..=num_txs - 1,);
290    }
291
292    #[test]
293    fn test_prune_input_get_next_tx_empty_range() {
294        // Create a new provider via factory
295        let mut rng = generators::rng();
296        let factory = create_test_provider_factory();
297
298        // Generate 10 random blocks
299        let blocks = random_block_range(
300            &mut rng,
301            0..=10,
302            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..5, ..Default::default() },
303        );
304
305        // Insert the blocks into the database
306        let provider_rw = factory.provider_rw().expect("failed to get provider_rw");
307        for block in &blocks {
308            provider_rw
309                .insert_block(
310                    block.clone().try_recover().expect("failed to seal block with senders"),
311                )
312                .expect("failed to insert block");
313        }
314        provider_rw.commit().expect("failed to commit");
315
316        // Create a new provider
317        let provider = BlockchainProvider::new(factory).unwrap();
318
319        // Get the last tx number
320        // Calculate the total number of transactions
321        let num_txs = blocks.iter().map(|block| block.transaction_count() as u64).sum::<u64>();
322        let max_range = num_txs - 1;
323
324        // Create a prune input with a previous checkpoint that is the last tx number
325        let input = PruneInput {
326            previous_checkpoint: Some(PruneCheckpoint {
327                block_number: Some(5),
328                tx_number: Some(max_range),
329                prune_mode: PruneMode::Full,
330            }),
331            to_block: 10,
332            limiter: PruneLimiter::default(),
333        };
334
335        // We expect an empty range since the previous checkpoint is the last tx number
336        let range = input.get_next_tx_num_range(&provider).expect("Expected range");
337        assert!(range.is_none());
338    }
339}