Skip to main content

reth_prune/segments/user/
history.rs

1use crate::PruneLimiter;
2use alloy_primitives::BlockNumber;
3use itertools::Itertools;
4use reth_db_api::{
5    cursor::{DbCursorRO, DbCursorRW},
6    models::ShardedKey,
7    table::Table,
8    transaction::DbTxMut,
9    BlockNumberList, DatabaseError, RawKey, RawTable, RawValue,
10};
11use reth_provider::DBProvider;
12use reth_prune_types::{SegmentOutput, SegmentOutputCheckpoint};
13use rustc_hash::FxHashMap;
14
15enum PruneShardOutcome {
16    Deleted,
17    Updated,
18    Unchanged,
19}
20
21#[derive(Debug, Default)]
22pub(crate) struct PrunedIndices {
23    pub(crate) deleted: usize,
24    pub(crate) updated: usize,
25    pub(crate) unchanged: usize,
26}
27
28/// Result of pruning history changesets, used to build the final output.
29pub(crate) struct HistoryPruneResult<K> {
30    /// Map of the highest deleted changeset keys to their block numbers.
31    pub(crate) highest_deleted: FxHashMap<K, BlockNumber>,
32    /// The last block number that had changesets pruned.
33    pub(crate) last_pruned_block: Option<BlockNumber>,
34    /// Number of changesets pruned.
35    pub(crate) pruned_count: usize,
36    /// Whether pruning is complete.
37    pub(crate) done: bool,
38}
39
40/// Finalizes history pruning by sorting sharded keys, pruning history indices, and building output.
41///
42/// This is shared between static file and database pruning for both account and storage history.
43pub(crate) fn finalize_history_prune<Provider, T, K, SK>(
44    provider: &Provider,
45    result: HistoryPruneResult<K>,
46    range_end: BlockNumber,
47    limiter: &PruneLimiter,
48    to_sharded_key: impl Fn(K, BlockNumber) -> T::Key,
49    key_matches: impl Fn(&T::Key, &T::Key) -> bool,
50) -> Result<SegmentOutput, DatabaseError>
51where
52    Provider: DBProvider<Tx: DbTxMut>,
53    T: Table<Value = BlockNumberList>,
54    T::Key: AsRef<ShardedKey<SK>>,
55    K: Ord,
56{
57    let HistoryPruneResult { highest_deleted, last_pruned_block, pruned_count, done } = result;
58
59    // If there's more changesets to prune, set the checkpoint block number to previous,
60    // so we could finish pruning its changesets on the next run.
61    let last_changeset_pruned_block = last_pruned_block
62        .map(|block_number| if done { block_number } else { block_number.saturating_sub(1) })
63        .unwrap_or(range_end);
64
65    // Sort highest deleted block numbers and turn them into sharded keys.
66    // We use `sorted_unstable` because no equal keys exist in the map.
67    let highest_sharded_keys =
68        highest_deleted.into_iter().sorted_unstable().map(|(key, block_number)| {
69            to_sharded_key(key, block_number.min(last_changeset_pruned_block))
70        });
71
72    let outcomes =
73        prune_history_indices::<Provider, T, _>(provider, highest_sharded_keys, key_matches)?;
74
75    let progress = limiter.progress(done);
76
77    Ok(SegmentOutput {
78        progress,
79        pruned: pruned_count + outcomes.deleted,
80        checkpoint: Some(SegmentOutputCheckpoint {
81            block_number: Some(last_changeset_pruned_block),
82            tx_number: None,
83        }),
84    })
85}
86
87/// Prune history indices according to the provided list of highest sharded keys.
88///
89/// Returns total number of deleted, updated and unchanged entities.
90pub(crate) fn prune_history_indices<Provider, T, SK>(
91    provider: &Provider,
92    highest_sharded_keys: impl IntoIterator<Item = T::Key>,
93    key_matches: impl Fn(&T::Key, &T::Key) -> bool,
94) -> Result<PrunedIndices, DatabaseError>
95where
96    Provider: DBProvider<Tx: DbTxMut>,
97    T: Table<Value = BlockNumberList>,
98    T::Key: AsRef<ShardedKey<SK>>,
99{
100    let mut outcomes = PrunedIndices::default();
101    let mut cursor = provider.tx_ref().cursor_write::<RawTable<T>>()?;
102
103    for sharded_key in highest_sharded_keys {
104        // Seek to the shard that has the key >= the given sharded key
105        // TODO: optimize
106        let mut shard = cursor.seek(RawKey::new(sharded_key.clone()))?;
107
108        // Get the highest block number that needs to be deleted for this sharded key
109        let to_block = sharded_key.as_ref().highest_block_number;
110
111        'shard: loop {
112            let Some((key, block_nums)) =
113                shard.map(|(k, v)| Result::<_, DatabaseError>::Ok((k.key()?, v))).transpose()?
114            else {
115                break
116            };
117
118            if key_matches(&key, &sharded_key) {
119                match prune_shard(&mut cursor, key, block_nums, to_block, &key_matches)? {
120                    PruneShardOutcome::Deleted => outcomes.deleted += 1,
121                    PruneShardOutcome::Updated => outcomes.updated += 1,
122                    PruneShardOutcome::Unchanged => outcomes.unchanged += 1,
123                }
124            } else {
125                // If such shard doesn't exist, skip to the next sharded key
126                break 'shard
127            }
128
129            shard = cursor.next()?;
130        }
131    }
132
133    Ok(outcomes)
134}
135
136/// Prunes one shard of a history table.
137///
138/// 1. If the shard has `highest_block_number` less than or equal to the target block number for
139///    pruning, delete the shard completely.
140/// 2. If the shard has `highest_block_number` greater than the target block number for pruning,
141///    filter block numbers inside the shard which are less than the target block number for
142///    pruning.
143fn prune_shard<C, T, SK>(
144    cursor: &mut C,
145    key: T::Key,
146    raw_blocks: RawValue<T::Value>,
147    to_block: BlockNumber,
148    key_matches: impl Fn(&T::Key, &T::Key) -> bool,
149) -> Result<PruneShardOutcome, DatabaseError>
150where
151    C: DbCursorRO<RawTable<T>> + DbCursorRW<RawTable<T>>,
152    T: Table<Value = BlockNumberList>,
153    T::Key: AsRef<ShardedKey<SK>>,
154{
155    // If shard consists only of block numbers less than the target one, delete shard
156    // completely.
157    if key.as_ref().highest_block_number <= to_block {
158        cursor.delete_current()?;
159        Ok(PruneShardOutcome::Deleted)
160    }
161    // Shard contains block numbers that are higher than the target one, so we need to
162    // filter it. It is guaranteed that further shards for this sharded key will not
163    // contain the target block number, as it's in this shard.
164    else {
165        let blocks = raw_blocks.value()?;
166        let higher_blocks =
167            blocks.iter().skip_while(|block| *block <= to_block).collect::<Vec<_>>();
168
169        // If there were blocks less than or equal to the target one
170        // (so the shard has changed), update the shard.
171        if blocks.len() as usize == higher_blocks.len() {
172            return Ok(PruneShardOutcome::Unchanged);
173        }
174
175        // If there will be no more blocks in the shard after pruning blocks below target
176        // block, we need to remove it, as empty shards are not allowed.
177        if higher_blocks.is_empty() {
178            if key.as_ref().highest_block_number == u64::MAX {
179                let prev_row = cursor
180                    .prev()?
181                    .map(|(k, v)| Result::<_, DatabaseError>::Ok((k.key()?, v)))
182                    .transpose()?;
183                match prev_row {
184                    // If current shard is the last shard for the sharded key that
185                    // has previous shards, replace it with the previous shard.
186                    Some((prev_key, prev_value)) if key_matches(&prev_key, &key) => {
187                        cursor.delete_current()?;
188                        // Upsert will replace the last shard for this sharded key with
189                        // the previous value.
190                        cursor.upsert(RawKey::new(key), &prev_value)?;
191                        Ok(PruneShardOutcome::Updated)
192                    }
193                    // If there's no previous shard for this sharded key,
194                    // just delete last shard completely.
195                    _ => {
196                        // If we successfully moved the cursor to a previous row,
197                        // jump to the original last shard.
198                        if prev_row.is_some() {
199                            cursor.next()?;
200                        }
201                        // Delete shard.
202                        cursor.delete_current()?;
203                        Ok(PruneShardOutcome::Deleted)
204                    }
205                }
206            }
207            // If current shard is not the last shard for this sharded key,
208            // just delete it.
209            else {
210                cursor.delete_current()?;
211                Ok(PruneShardOutcome::Deleted)
212            }
213        } else {
214            cursor.upsert(
215                RawKey::new(key),
216                &RawValue::new(BlockNumberList::new_pre_sorted(higher_blocks)),
217            )?;
218            Ok(PruneShardOutcome::Updated)
219        }
220    }
221}