1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
use alloy_primitives::BlockNumber;
use reth_db::{BlockNumberList, RawKey, RawTable, RawValue};
use reth_db_api::{
    cursor::{DbCursorRO, DbCursorRW},
    models::ShardedKey,
    table::Table,
    transaction::DbTxMut,
    DatabaseError,
};
use reth_provider::DBProvider;

enum PruneShardOutcome {
    Deleted,
    Updated,
    Unchanged,
}

#[derive(Debug, Default)]
pub(crate) struct PrunedIndices {
    pub(crate) deleted: usize,
    pub(crate) updated: usize,
    pub(crate) unchanged: usize,
}

/// Prune history indices according to the provided list of highest sharded keys.
///
/// Returns total number of deleted, updated and unchanged entities.
pub(crate) fn prune_history_indices<Provider, T, SK>(
    provider: &Provider,
    highest_sharded_keys: impl IntoIterator<Item = T::Key>,
    key_matches: impl Fn(&T::Key, &T::Key) -> bool,
) -> Result<PrunedIndices, DatabaseError>
where
    Provider: DBProvider<Tx: DbTxMut>,
    T: Table<Value = BlockNumberList>,
    T::Key: AsRef<ShardedKey<SK>>,
{
    let mut outcomes = PrunedIndices::default();
    let mut cursor = provider.tx_ref().cursor_write::<RawTable<T>>()?;

    for sharded_key in highest_sharded_keys {
        // Seek to the shard that has the key >= the given sharded key
        // TODO: optimize
        let mut shard = cursor.seek(RawKey::new(sharded_key.clone()))?;

        // Get the highest block number that needs to be deleted for this sharded key
        let to_block = sharded_key.as_ref().highest_block_number;

        'shard: loop {
            let Some((key, block_nums)) =
                shard.map(|(k, v)| Result::<_, DatabaseError>::Ok((k.key()?, v))).transpose()?
            else {
                break
            };

            if key_matches(&key, &sharded_key) {
                match prune_shard(&mut cursor, key, block_nums, to_block, &key_matches)? {
                    PruneShardOutcome::Deleted => outcomes.deleted += 1,
                    PruneShardOutcome::Updated => outcomes.updated += 1,
                    PruneShardOutcome::Unchanged => outcomes.unchanged += 1,
                }
            } else {
                // If such shard doesn't exist, skip to the next sharded key
                break 'shard
            }

            shard = cursor.next()?;
        }
    }

    Ok(outcomes)
}

/// Prunes one shard of a history table.
///
/// 1. If the shard has `highest_block_number` less than or equal to the target block number for
///    pruning, delete the shard completely.
/// 2. If the shard has `highest_block_number` greater than the target block number for pruning,
///    filter block numbers inside the shard which are less than the target block number for
///    pruning.
fn prune_shard<C, T, SK>(
    cursor: &mut C,
    key: T::Key,
    raw_blocks: RawValue<T::Value>,
    to_block: BlockNumber,
    key_matches: impl Fn(&T::Key, &T::Key) -> bool,
) -> Result<PruneShardOutcome, DatabaseError>
where
    C: DbCursorRO<RawTable<T>> + DbCursorRW<RawTable<T>>,
    T: Table<Value = BlockNumberList>,
    T::Key: AsRef<ShardedKey<SK>>,
{
    // If shard consists only of block numbers less than the target one, delete shard
    // completely.
    if key.as_ref().highest_block_number <= to_block {
        cursor.delete_current()?;
        Ok(PruneShardOutcome::Deleted)
    }
    // Shard contains block numbers that are higher than the target one, so we need to
    // filter it. It is guaranteed that further shards for this sharded key will not
    // contain the target block number, as it's in this shard.
    else {
        let blocks = raw_blocks.value()?;
        let higher_blocks =
            blocks.iter().skip_while(|block| *block <= to_block).collect::<Vec<_>>();

        // If there were blocks less than or equal to the target one
        // (so the shard has changed), update the shard.
        if blocks.len() as usize == higher_blocks.len() {
            return Ok(PruneShardOutcome::Unchanged);
        }

        // If there will be no more blocks in the shard after pruning blocks below target
        // block, we need to remove it, as empty shards are not allowed.
        if higher_blocks.is_empty() {
            if key.as_ref().highest_block_number == u64::MAX {
                let prev_row = cursor
                    .prev()?
                    .map(|(k, v)| Result::<_, DatabaseError>::Ok((k.key()?, v)))
                    .transpose()?;
                match prev_row {
                    // If current shard is the last shard for the sharded key that
                    // has previous shards, replace it with the previous shard.
                    Some((prev_key, prev_value)) if key_matches(&prev_key, &key) => {
                        cursor.delete_current()?;
                        // Upsert will replace the last shard for this sharded key with
                        // the previous value.
                        cursor.upsert(RawKey::new(key), prev_value)?;
                        Ok(PruneShardOutcome::Updated)
                    }
                    // If there's no previous shard for this sharded key,
                    // just delete last shard completely.
                    _ => {
                        // If we successfully moved the cursor to a previous row,
                        // jump to the original last shard.
                        if prev_row.is_some() {
                            cursor.next()?;
                        }
                        // Delete shard.
                        cursor.delete_current()?;
                        Ok(PruneShardOutcome::Deleted)
                    }
                }
            }
            // If current shard is not the last shard for this sharded key,
            // just delete it.
            else {
                cursor.delete_current()?;
                Ok(PruneShardOutcome::Deleted)
            }
        } else {
            cursor.upsert(
                RawKey::new(key),
                RawValue::new(BlockNumberList::new_pre_sorted(higher_blocks)),
            )?;
            Ok(PruneShardOutcome::Updated)
        }
    }
}