reth_prune/segments/user/
history.rs1use crate::PruneLimiter;
2use alloy_primitives::BlockNumber;
3use itertools::Itertools;
4use reth_db_api::{
5 cursor::{DbCursorRO, DbCursorRW},
6 models::ShardedKey,
7 table::Table,
8 transaction::DbTxMut,
9 BlockNumberList, DatabaseError, RawKey, RawTable, RawValue,
10};
11use reth_provider::DBProvider;
12use reth_prune_types::{SegmentOutput, SegmentOutputCheckpoint};
13use rustc_hash::FxHashMap;
14
15enum PruneShardOutcome {
16 Deleted,
17 Updated,
18 Unchanged,
19}
20
21#[derive(Debug, Default)]
22pub(crate) struct PrunedIndices {
23 pub(crate) deleted: usize,
24 pub(crate) updated: usize,
25 pub(crate) unchanged: usize,
26}
27
28pub(crate) struct HistoryPruneResult<K> {
30 pub(crate) highest_deleted: FxHashMap<K, BlockNumber>,
32 pub(crate) last_pruned_block: Option<BlockNumber>,
34 pub(crate) pruned_count: usize,
36 pub(crate) done: bool,
38}
39
40pub(crate) fn finalize_history_prune<Provider, T, K, SK>(
44 provider: &Provider,
45 result: HistoryPruneResult<K>,
46 range_end: BlockNumber,
47 limiter: &PruneLimiter,
48 to_sharded_key: impl Fn(K, BlockNumber) -> T::Key,
49 key_matches: impl Fn(&T::Key, &T::Key) -> bool,
50) -> Result<SegmentOutput, DatabaseError>
51where
52 Provider: DBProvider<Tx: DbTxMut>,
53 T: Table<Value = BlockNumberList>,
54 T::Key: AsRef<ShardedKey<SK>>,
55 K: Ord,
56{
57 let HistoryPruneResult { highest_deleted, last_pruned_block, pruned_count, done } = result;
58
59 let last_changeset_pruned_block = last_pruned_block
62 .map(|block_number| if done { block_number } else { block_number.saturating_sub(1) })
63 .unwrap_or(range_end);
64
65 let highest_sharded_keys =
68 highest_deleted.into_iter().sorted_unstable().map(|(key, block_number)| {
69 to_sharded_key(key, block_number.min(last_changeset_pruned_block))
70 });
71
72 let outcomes =
73 prune_history_indices::<Provider, T, _>(provider, highest_sharded_keys, key_matches)?;
74
75 let progress = limiter.progress(done);
76
77 Ok(SegmentOutput {
78 progress,
79 pruned: pruned_count + outcomes.deleted,
80 checkpoint: Some(SegmentOutputCheckpoint {
81 block_number: Some(last_changeset_pruned_block),
82 tx_number: None,
83 }),
84 })
85}
86
87pub(crate) fn prune_history_indices<Provider, T, SK>(
91 provider: &Provider,
92 highest_sharded_keys: impl IntoIterator<Item = T::Key>,
93 key_matches: impl Fn(&T::Key, &T::Key) -> bool,
94) -> Result<PrunedIndices, DatabaseError>
95where
96 Provider: DBProvider<Tx: DbTxMut>,
97 T: Table<Value = BlockNumberList>,
98 T::Key: AsRef<ShardedKey<SK>>,
99{
100 let mut outcomes = PrunedIndices::default();
101 let mut cursor = provider.tx_ref().cursor_write::<RawTable<T>>()?;
102
103 for sharded_key in highest_sharded_keys {
104 let mut shard = cursor.seek(RawKey::new(sharded_key.clone()))?;
107
108 let to_block = sharded_key.as_ref().highest_block_number;
110
111 'shard: loop {
112 let Some((key, block_nums)) =
113 shard.map(|(k, v)| Result::<_, DatabaseError>::Ok((k.key()?, v))).transpose()?
114 else {
115 break
116 };
117
118 if key_matches(&key, &sharded_key) {
119 match prune_shard(&mut cursor, key, block_nums, to_block, &key_matches)? {
120 PruneShardOutcome::Deleted => outcomes.deleted += 1,
121 PruneShardOutcome::Updated => outcomes.updated += 1,
122 PruneShardOutcome::Unchanged => outcomes.unchanged += 1,
123 }
124 } else {
125 break 'shard
127 }
128
129 shard = cursor.next()?;
130 }
131 }
132
133 Ok(outcomes)
134}
135
136fn prune_shard<C, T, SK>(
144 cursor: &mut C,
145 key: T::Key,
146 raw_blocks: RawValue<T::Value>,
147 to_block: BlockNumber,
148 key_matches: impl Fn(&T::Key, &T::Key) -> bool,
149) -> Result<PruneShardOutcome, DatabaseError>
150where
151 C: DbCursorRO<RawTable<T>> + DbCursorRW<RawTable<T>>,
152 T: Table<Value = BlockNumberList>,
153 T::Key: AsRef<ShardedKey<SK>>,
154{
155 if key.as_ref().highest_block_number <= to_block {
158 cursor.delete_current()?;
159 Ok(PruneShardOutcome::Deleted)
160 }
161 else {
165 let blocks = raw_blocks.value()?;
166 let higher_blocks =
167 blocks.iter().skip_while(|block| *block <= to_block).collect::<Vec<_>>();
168
169 if blocks.len() as usize == higher_blocks.len() {
172 return Ok(PruneShardOutcome::Unchanged);
173 }
174
175 if higher_blocks.is_empty() {
178 if key.as_ref().highest_block_number == u64::MAX {
179 let prev_row = cursor
180 .prev()?
181 .map(|(k, v)| Result::<_, DatabaseError>::Ok((k.key()?, v)))
182 .transpose()?;
183 match prev_row {
184 Some((prev_key, prev_value)) if key_matches(&prev_key, &key) => {
187 cursor.delete_current()?;
188 cursor.upsert(RawKey::new(key), &prev_value)?;
191 Ok(PruneShardOutcome::Updated)
192 }
193 _ => {
196 if prev_row.is_some() {
199 cursor.next()?;
200 }
201 cursor.delete_current()?;
203 Ok(PruneShardOutcome::Deleted)
204 }
205 }
206 }
207 else {
210 cursor.delete_current()?;
211 Ok(PruneShardOutcome::Deleted)
212 }
213 } else {
214 cursor.upsert(
215 RawKey::new(key),
216 &RawValue::new(BlockNumberList::new_pre_sorted(higher_blocks)),
217 )?;
218 Ok(PruneShardOutcome::Updated)
219 }
220 }
221}