reth_prune/segments/user/
merkle_change_sets.rs

1use crate::{
2    db_ext::DbTxPruneExt,
3    segments::{PruneInput, Segment},
4    PrunerError,
5};
6use alloy_primitives::B256;
7use reth_db_api::{models::BlockNumberHashedAddress, table::Value, tables, transaction::DbTxMut};
8use reth_primitives_traits::NodePrimitives;
9use reth_provider::{
10    errors::provider::ProviderResult, BlockReader, ChainStateBlockReader, DBProvider,
11    NodePrimitivesProvider, PruneCheckpointWriter, TransactionsProvider,
12};
13use reth_prune_types::{
14    PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
15};
16use reth_stages_types::StageId;
17use tracing::{instrument, trace};
18
19#[derive(Debug)]
20pub struct MerkleChangeSets {
21    mode: PruneMode,
22}
23
24impl MerkleChangeSets {
25    pub const fn new(mode: PruneMode) -> Self {
26        Self { mode }
27    }
28}
29
30impl<Provider> Segment<Provider> for MerkleChangeSets
31where
32    Provider: DBProvider<Tx: DbTxMut>
33        + PruneCheckpointWriter
34        + TransactionsProvider
35        + BlockReader
36        + ChainStateBlockReader
37        + NodePrimitivesProvider<Primitives: NodePrimitives<Receipt: Value>>,
38{
39    fn segment(&self) -> PruneSegment {
40        PruneSegment::MerkleChangeSets
41    }
42
43    fn mode(&self) -> Option<PruneMode> {
44        Some(self.mode)
45    }
46
47    fn purpose(&self) -> PrunePurpose {
48        PrunePurpose::User
49    }
50
51    fn required_stage(&self) -> Option<StageId> {
52        Some(StageId::MerkleChangeSets)
53    }
54
55    #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
56    fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
57        let Some(block_range) = input.get_next_block_range() else {
58            trace!(target: "pruner", "No change sets to prune");
59            return Ok(SegmentOutput::done())
60        };
61
62        let block_range_end = *block_range.end();
63        let mut limiter = input.limiter;
64
65        // Create range for StoragesTrieChangeSets which uses BlockNumberHashedAddress as key
66        let storage_range_start: BlockNumberHashedAddress =
67            (*block_range.start(), B256::ZERO).into();
68        let storage_range_end: BlockNumberHashedAddress =
69            (*block_range.end() + 1, B256::ZERO).into();
70        let storage_range = storage_range_start..storage_range_end;
71
72        let mut last_storages_pruned_block = None;
73        let (storages_pruned, done) =
74            provider.tx_ref().prune_dupsort_table_with_range::<tables::StoragesTrieChangeSets>(
75                storage_range,
76                &mut limiter,
77                |(BlockNumberHashedAddress((block_number, _)), _)| {
78                    last_storages_pruned_block = Some(block_number);
79                },
80            )?;
81
82        trace!(target: "pruner", %storages_pruned, %done, "Pruned storages change sets");
83
84        let mut last_accounts_pruned_block = block_range_end;
85        let last_storages_pruned_block = last_storages_pruned_block
86            // If there's more storage changesets to prune, set the checkpoint block number to
87            // previous, so we could finish pruning its storage changesets on the next run.
88            .map(|block_number| if done { block_number } else { block_number.saturating_sub(1) })
89            .unwrap_or(block_range_end);
90
91        let (accounts_pruned, done) =
92            provider.tx_ref().prune_dupsort_table_with_range::<tables::AccountsTrieChangeSets>(
93                block_range,
94                &mut limiter,
95                |row| last_accounts_pruned_block = row.0,
96            )?;
97
98        trace!(target: "pruner", %accounts_pruned, %done, "Pruned accounts change sets");
99
100        let progress = limiter.progress(done);
101
102        Ok(SegmentOutput {
103            progress,
104            pruned: accounts_pruned + storages_pruned,
105            checkpoint: Some(SegmentOutputCheckpoint {
106                block_number: Some(last_storages_pruned_block.min(last_accounts_pruned_block)),
107                tx_number: None,
108            }),
109        })
110    }
111
112    fn save_checkpoint(
113        &self,
114        provider: &Provider,
115        checkpoint: PruneCheckpoint,
116    ) -> ProviderResult<()> {
117        provider.save_prune_checkpoint(PruneSegment::MerkleChangeSets, checkpoint)
118    }
119}