reth_prune/segments/static_file/
headers.rs
1use crate::{
2 db_ext::DbTxPruneExt,
3 segments::{PruneInput, Segment},
4 PruneLimiter, PrunerError,
5};
6use alloy_primitives::BlockNumber;
7use itertools::Itertools;
8use reth_db_api::{
9 cursor::{DbCursorRO, RangeWalker},
10 tables,
11 transaction::DbTxMut,
12};
13use reth_provider::{providers::StaticFileProvider, DBProvider, StaticFileProviderFactory};
14use reth_prune_types::{
15 PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
16};
17use reth_static_file_types::StaticFileSegment;
18use std::num::NonZeroUsize;
19use tracing::trace;
20
21const HEADER_TABLES_TO_PRUNE: usize = 3;
23
24#[derive(Debug)]
25pub struct Headers<N> {
26 static_file_provider: StaticFileProvider<N>,
27}
28
29impl<N> Headers<N> {
30 pub const fn new(static_file_provider: StaticFileProvider<N>) -> Self {
31 Self { static_file_provider }
32 }
33}
34
35impl<Provider: StaticFileProviderFactory + DBProvider<Tx: DbTxMut>> Segment<Provider>
36 for Headers<Provider::Primitives>
37{
38 fn segment(&self) -> PruneSegment {
39 PruneSegment::Headers
40 }
41
42 fn mode(&self) -> Option<PruneMode> {
43 self.static_file_provider
44 .get_highest_static_file_block(StaticFileSegment::Headers)
45 .map(PruneMode::before_inclusive)
46 }
47
48 fn purpose(&self) -> PrunePurpose {
49 PrunePurpose::StaticFile
50 }
51
52 fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
53 let (block_range_start, block_range_end) = match input.get_next_block_range() {
54 Some(range) => (*range.start(), *range.end()),
55 None => {
56 trace!(target: "pruner", "No headers to prune");
57 return Ok(SegmentOutput::done())
58 }
59 };
60
61 let last_pruned_block =
62 if block_range_start == 0 { None } else { Some(block_range_start - 1) };
63
64 let range = last_pruned_block.map_or(0, |block| block + 1)..=block_range_end;
65
66 let mut headers_cursor = provider.tx_ref().cursor_write::<tables::Headers>()?;
67 let mut header_tds_cursor =
68 provider.tx_ref().cursor_write::<tables::HeaderTerminalDifficulties>()?;
69 let mut canonical_headers_cursor =
70 provider.tx_ref().cursor_write::<tables::CanonicalHeaders>()?;
71
72 let mut limiter = input.limiter.floor_deleted_entries_limit_to_multiple_of(
73 NonZeroUsize::new(HEADER_TABLES_TO_PRUNE).unwrap(),
74 );
75
76 let tables_iter = HeaderTablesIter::new(
77 provider,
78 &mut limiter,
79 headers_cursor.walk_range(range.clone())?,
80 header_tds_cursor.walk_range(range.clone())?,
81 canonical_headers_cursor.walk_range(range)?,
82 );
83
84 let mut last_pruned_block: Option<u64> = None;
85 let mut pruned = 0;
86 for res in tables_iter {
87 let HeaderTablesIterItem { pruned_block, entries_pruned } = res?;
88 last_pruned_block = Some(pruned_block);
89 pruned += entries_pruned;
90 }
91
92 let done = last_pruned_block == Some(block_range_end);
93 let progress = limiter.progress(done);
94
95 Ok(SegmentOutput {
96 progress,
97 pruned,
98 checkpoint: Some(SegmentOutputCheckpoint {
99 block_number: last_pruned_block,
100 tx_number: None,
101 }),
102 })
103 }
104}
105type Walker<'a, Provider, T> =
106 RangeWalker<'a, T, <<Provider as DBProvider>::Tx as DbTxMut>::CursorMut<T>>;
107
108#[allow(missing_debug_implementations)]
109struct HeaderTablesIter<'a, Provider>
110where
111 Provider: DBProvider<Tx: DbTxMut>,
112{
113 provider: &'a Provider,
114 limiter: &'a mut PruneLimiter,
115 headers_walker: Walker<'a, Provider, tables::Headers>,
116 header_tds_walker: Walker<'a, Provider, tables::HeaderTerminalDifficulties>,
117 canonical_headers_walker: Walker<'a, Provider, tables::CanonicalHeaders>,
118}
119
120struct HeaderTablesIterItem {
121 pruned_block: BlockNumber,
122 entries_pruned: usize,
123}
124
125impl<'a, Provider> HeaderTablesIter<'a, Provider>
126where
127 Provider: DBProvider<Tx: DbTxMut>,
128{
129 const fn new(
130 provider: &'a Provider,
131 limiter: &'a mut PruneLimiter,
132 headers_walker: Walker<'a, Provider, tables::Headers>,
133 header_tds_walker: Walker<'a, Provider, tables::HeaderTerminalDifficulties>,
134 canonical_headers_walker: Walker<'a, Provider, tables::CanonicalHeaders>,
135 ) -> Self {
136 Self { provider, limiter, headers_walker, header_tds_walker, canonical_headers_walker }
137 }
138}
139
140impl<Provider> Iterator for HeaderTablesIter<'_, Provider>
141where
142 Provider: DBProvider<Tx: DbTxMut>,
143{
144 type Item = Result<HeaderTablesIterItem, PrunerError>;
145 fn next(&mut self) -> Option<Self::Item> {
146 if self.limiter.is_limit_reached() {
147 return None
148 }
149
150 let mut pruned_block_headers = None;
151 let mut pruned_block_td = None;
152 let mut pruned_block_canonical = None;
153
154 if let Err(err) = self.provider.tx_ref().prune_table_with_range_step(
155 &mut self.headers_walker,
156 self.limiter,
157 &mut |_| false,
158 &mut |row| pruned_block_headers = Some(row.0),
159 ) {
160 return Some(Err(err.into()))
161 }
162
163 if let Err(err) = self.provider.tx_ref().prune_table_with_range_step(
164 &mut self.header_tds_walker,
165 self.limiter,
166 &mut |_| false,
167 &mut |row| pruned_block_td = Some(row.0),
168 ) {
169 return Some(Err(err.into()))
170 }
171
172 if let Err(err) = self.provider.tx_ref().prune_table_with_range_step(
173 &mut self.canonical_headers_walker,
174 self.limiter,
175 &mut |_| false,
176 &mut |row| pruned_block_canonical = Some(row.0),
177 ) {
178 return Some(Err(err.into()))
179 }
180
181 if ![pruned_block_headers, pruned_block_td, pruned_block_canonical].iter().all_equal() {
182 return Some(Err(PrunerError::InconsistentData(
183 "All headers-related tables should be pruned up to the same height",
184 )))
185 }
186
187 pruned_block_headers.map(move |block| {
188 Ok(HeaderTablesIterItem { pruned_block: block, entries_pruned: HEADER_TABLES_TO_PRUNE })
189 })
190 }
191}
192
193#[cfg(test)]
194mod tests {
195 use crate::segments::{
196 static_file::headers::HEADER_TABLES_TO_PRUNE, PruneInput, PruneLimiter, Segment,
197 SegmentOutput,
198 };
199 use alloy_primitives::{BlockNumber, B256, U256};
200 use assert_matches::assert_matches;
201 use reth_db_api::{tables, transaction::DbTx};
202 use reth_provider::{
203 DatabaseProviderFactory, PruneCheckpointReader, PruneCheckpointWriter,
204 StaticFileProviderFactory,
205 };
206 use reth_prune_types::{
207 PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
208 SegmentOutputCheckpoint,
209 };
210 use reth_stages::test_utils::TestStageDB;
211 use reth_testing_utils::{generators, generators::random_header_range};
212 use tracing::trace;
213
214 #[test]
215 fn prune() {
216 reth_tracing::init_test_tracing();
217
218 let db = TestStageDB::default();
219 let mut rng = generators::rng();
220
221 let headers = random_header_range(&mut rng, 0..100, B256::ZERO);
222 let tx = db.factory.provider_rw().unwrap().into_tx();
223 for header in &headers {
224 TestStageDB::insert_header(None, &tx, header, U256::ZERO).unwrap();
225 }
226 tx.commit().unwrap();
227
228 assert_eq!(db.table::<tables::CanonicalHeaders>().unwrap().len(), headers.len());
229 assert_eq!(db.table::<tables::Headers>().unwrap().len(), headers.len());
230 assert_eq!(db.table::<tables::HeaderTerminalDifficulties>().unwrap().len(), headers.len());
231
232 let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| {
233 let segment = super::Headers::new(db.factory.static_file_provider());
234 let prune_mode = PruneMode::Before(to_block);
235 let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10);
236 let input = PruneInput {
237 previous_checkpoint: db
238 .factory
239 .provider()
240 .unwrap()
241 .get_prune_checkpoint(PruneSegment::Headers)
242 .unwrap(),
243 to_block,
244 limiter: limiter.clone(),
245 };
246
247 let next_block_number_to_prune = db
248 .factory
249 .provider()
250 .unwrap()
251 .get_prune_checkpoint(PruneSegment::Headers)
252 .unwrap()
253 .and_then(|checkpoint| checkpoint.block_number)
254 .map(|block_number| block_number + 1)
255 .unwrap_or_default();
256
257 let provider = db.factory.database_provider_rw().unwrap();
258 let result = segment.prune(&provider, input.clone()).unwrap();
259 limiter.increment_deleted_entries_count_by(result.pruned);
260 trace!(target: "pruner::test",
261 expected_prune_progress=?expected_result.0,
262 expected_pruned=?expected_result.1,
263 result=?result,
264 "SegmentOutput"
265 );
266
267 assert_matches!(
268 result,
269 SegmentOutput {progress, pruned, checkpoint: Some(_)}
270 if (progress, pruned) == expected_result
271 );
272 provider
273 .save_prune_checkpoint(
274 PruneSegment::Headers,
275 result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
276 )
277 .unwrap();
278 provider.commit().expect("commit");
279
280 let last_pruned_block_number = to_block.min(
281 next_block_number_to_prune +
282 (input.limiter.deleted_entries_limit().unwrap() / HEADER_TABLES_TO_PRUNE - 1)
283 as u64,
284 );
285
286 assert_eq!(
287 db.table::<tables::CanonicalHeaders>().unwrap().len(),
288 headers.len() - (last_pruned_block_number + 1) as usize
289 );
290 assert_eq!(
291 db.table::<tables::Headers>().unwrap().len(),
292 headers.len() - (last_pruned_block_number + 1) as usize
293 );
294 assert_eq!(
295 db.table::<tables::HeaderTerminalDifficulties>().unwrap().len(),
296 headers.len() - (last_pruned_block_number + 1) as usize
297 );
298 assert_eq!(
299 db.factory.provider().unwrap().get_prune_checkpoint(PruneSegment::Headers).unwrap(),
300 Some(PruneCheckpoint {
301 block_number: Some(last_pruned_block_number),
302 tx_number: None,
303 prune_mode
304 })
305 );
306 };
307
308 test_prune(
309 3,
310 (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 9),
311 );
312 test_prune(3, (PruneProgress::Finished, 3));
313 }
314
315 #[test]
316 fn prune_cannot_be_done() {
317 let db = TestStageDB::default();
318
319 let limiter = PruneLimiter::default().set_deleted_entries_limit(0);
320
321 let input = PruneInput {
322 previous_checkpoint: None,
323 to_block: 1,
324 limiter,
326 };
327
328 let provider = db.factory.database_provider_rw().unwrap();
329 let segment = super::Headers::new(db.factory.static_file_provider());
330 let result = segment.prune(&provider, input).unwrap();
331 assert_eq!(
332 result,
333 SegmentOutput::not_done(
334 PruneInterruptReason::DeletedEntriesLimitReached,
335 Some(SegmentOutputCheckpoint::default())
336 )
337 );
338 }
339}