reth_prune/segments/user/
history.rs

1use alloy_primitives::BlockNumber;
2use reth_db_api::{
3    cursor::{DbCursorRO, DbCursorRW},
4    models::ShardedKey,
5    table::Table,
6    transaction::DbTxMut,
7    BlockNumberList, DatabaseError, RawKey, RawTable, RawValue,
8};
9use reth_provider::DBProvider;
10
11enum PruneShardOutcome {
12    Deleted,
13    Updated,
14    Unchanged,
15}
16
17#[derive(Debug, Default)]
18pub(crate) struct PrunedIndices {
19    pub(crate) deleted: usize,
20    pub(crate) updated: usize,
21    pub(crate) unchanged: usize,
22}
23
24/// Prune history indices according to the provided list of highest sharded keys.
25///
26/// Returns total number of deleted, updated and unchanged entities.
27pub(crate) fn prune_history_indices<Provider, T, SK>(
28    provider: &Provider,
29    highest_sharded_keys: impl IntoIterator<Item = T::Key>,
30    key_matches: impl Fn(&T::Key, &T::Key) -> bool,
31) -> Result<PrunedIndices, DatabaseError>
32where
33    Provider: DBProvider<Tx: DbTxMut>,
34    T: Table<Value = BlockNumberList>,
35    T::Key: AsRef<ShardedKey<SK>>,
36{
37    let mut outcomes = PrunedIndices::default();
38    let mut cursor = provider.tx_ref().cursor_write::<RawTable<T>>()?;
39
40    for sharded_key in highest_sharded_keys {
41        // Seek to the shard that has the key >= the given sharded key
42        // TODO: optimize
43        let mut shard = cursor.seek(RawKey::new(sharded_key.clone()))?;
44
45        // Get the highest block number that needs to be deleted for this sharded key
46        let to_block = sharded_key.as_ref().highest_block_number;
47
48        'shard: loop {
49            let Some((key, block_nums)) =
50                shard.map(|(k, v)| Result::<_, DatabaseError>::Ok((k.key()?, v))).transpose()?
51            else {
52                break
53            };
54
55            if key_matches(&key, &sharded_key) {
56                match prune_shard(&mut cursor, key, block_nums, to_block, &key_matches)? {
57                    PruneShardOutcome::Deleted => outcomes.deleted += 1,
58                    PruneShardOutcome::Updated => outcomes.updated += 1,
59                    PruneShardOutcome::Unchanged => outcomes.unchanged += 1,
60                }
61            } else {
62                // If such shard doesn't exist, skip to the next sharded key
63                break 'shard
64            }
65
66            shard = cursor.next()?;
67        }
68    }
69
70    Ok(outcomes)
71}
72
73/// Prunes one shard of a history table.
74///
75/// 1. If the shard has `highest_block_number` less than or equal to the target block number for
76///    pruning, delete the shard completely.
77/// 2. If the shard has `highest_block_number` greater than the target block number for pruning,
78///    filter block numbers inside the shard which are less than the target block number for
79///    pruning.
80fn prune_shard<C, T, SK>(
81    cursor: &mut C,
82    key: T::Key,
83    raw_blocks: RawValue<T::Value>,
84    to_block: BlockNumber,
85    key_matches: impl Fn(&T::Key, &T::Key) -> bool,
86) -> Result<PruneShardOutcome, DatabaseError>
87where
88    C: DbCursorRO<RawTable<T>> + DbCursorRW<RawTable<T>>,
89    T: Table<Value = BlockNumberList>,
90    T::Key: AsRef<ShardedKey<SK>>,
91{
92    // If shard consists only of block numbers less than the target one, delete shard
93    // completely.
94    if key.as_ref().highest_block_number <= to_block {
95        cursor.delete_current()?;
96        Ok(PruneShardOutcome::Deleted)
97    }
98    // Shard contains block numbers that are higher than the target one, so we need to
99    // filter it. It is guaranteed that further shards for this sharded key will not
100    // contain the target block number, as it's in this shard.
101    else {
102        let blocks = raw_blocks.value()?;
103        let higher_blocks =
104            blocks.iter().skip_while(|block| *block <= to_block).collect::<Vec<_>>();
105
106        // If there were blocks less than or equal to the target one
107        // (so the shard has changed), update the shard.
108        if blocks.len() as usize == higher_blocks.len() {
109            return Ok(PruneShardOutcome::Unchanged);
110        }
111
112        // If there will be no more blocks in the shard after pruning blocks below target
113        // block, we need to remove it, as empty shards are not allowed.
114        if higher_blocks.is_empty() {
115            if key.as_ref().highest_block_number == u64::MAX {
116                let prev_row = cursor
117                    .prev()?
118                    .map(|(k, v)| Result::<_, DatabaseError>::Ok((k.key()?, v)))
119                    .transpose()?;
120                match prev_row {
121                    // If current shard is the last shard for the sharded key that
122                    // has previous shards, replace it with the previous shard.
123                    Some((prev_key, prev_value)) if key_matches(&prev_key, &key) => {
124                        cursor.delete_current()?;
125                        // Upsert will replace the last shard for this sharded key with
126                        // the previous value.
127                        cursor.upsert(RawKey::new(key), &prev_value)?;
128                        Ok(PruneShardOutcome::Updated)
129                    }
130                    // If there's no previous shard for this sharded key,
131                    // just delete last shard completely.
132                    _ => {
133                        // If we successfully moved the cursor to a previous row,
134                        // jump to the original last shard.
135                        if prev_row.is_some() {
136                            cursor.next()?;
137                        }
138                        // Delete shard.
139                        cursor.delete_current()?;
140                        Ok(PruneShardOutcome::Deleted)
141                    }
142                }
143            }
144            // If current shard is not the last shard for this sharded key,
145            // just delete it.
146            else {
147                cursor.delete_current()?;
148                Ok(PruneShardOutcome::Deleted)
149            }
150        } else {
151            cursor.upsert(
152                RawKey::new(key),
153                &RawValue::new(BlockNumberList::new_pre_sorted(higher_blocks)),
154            )?;
155            Ok(PruneShardOutcome::Updated)
156        }
157    }
158}