reth_prune/segments/static_file/
headers.rs1use crate::{
2 db_ext::DbTxPruneExt,
3 segments::{PruneInput, Segment},
4 PruneLimiter, PrunerError,
5};
6use alloy_primitives::BlockNumber;
7use itertools::Itertools;
8use reth_db_api::{
9 cursor::{DbCursorRO, RangeWalker},
10 table::Value,
11 tables,
12 transaction::DbTxMut,
13};
14use reth_primitives_traits::NodePrimitives;
15use reth_provider::{providers::StaticFileProvider, DBProvider, StaticFileProviderFactory};
16use reth_prune_types::{
17 PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
18};
19use reth_static_file_types::StaticFileSegment;
20use std::num::NonZeroUsize;
21use tracing::trace;
22
23const HEADER_TABLES_TO_PRUNE: usize = 3;
25
26#[derive(Debug)]
27pub struct Headers<N> {
28 static_file_provider: StaticFileProvider<N>,
29}
30
31impl<N> Headers<N> {
32 pub const fn new(static_file_provider: StaticFileProvider<N>) -> Self {
33 Self { static_file_provider }
34 }
35}
36
37impl<Provider> Segment<Provider> for Headers<Provider::Primitives>
38where
39 Provider: StaticFileProviderFactory<Primitives: NodePrimitives<BlockHeader: Value>>
40 + DBProvider<Tx: DbTxMut>,
41{
42 fn segment(&self) -> PruneSegment {
43 PruneSegment::Headers
44 }
45
46 fn mode(&self) -> Option<PruneMode> {
47 self.static_file_provider
48 .get_highest_static_file_block(StaticFileSegment::Headers)
49 .map(PruneMode::before_inclusive)
50 }
51
52 fn purpose(&self) -> PrunePurpose {
53 PrunePurpose::StaticFile
54 }
55
56 fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
57 let (block_range_start, block_range_end) = match input.get_next_block_range() {
58 Some(range) => (*range.start(), *range.end()),
59 None => {
60 trace!(target: "pruner", "No headers to prune");
61 return Ok(SegmentOutput::done())
62 }
63 };
64
65 let last_pruned_block =
66 if block_range_start == 0 { None } else { Some(block_range_start - 1) };
67
68 let range = last_pruned_block.map_or(0, |block| block + 1)..=block_range_end;
69
70 let mut headers_cursor = provider
72 .tx_ref()
73 .cursor_write::<tables::Headers<<Provider::Primitives as NodePrimitives>::BlockHeader>>(
74 )?;
75
76 let mut header_tds_cursor =
77 provider.tx_ref().cursor_write::<tables::HeaderTerminalDifficulties>()?;
78 let mut canonical_headers_cursor =
79 provider.tx_ref().cursor_write::<tables::CanonicalHeaders>()?;
80
81 let mut limiter = input.limiter.floor_deleted_entries_limit_to_multiple_of(
82 NonZeroUsize::new(HEADER_TABLES_TO_PRUNE).unwrap(),
83 );
84
85 let tables_iter = HeaderTablesIter::new(
86 provider,
87 &mut limiter,
88 headers_cursor.walk_range(range.clone())?,
89 header_tds_cursor.walk_range(range.clone())?,
90 canonical_headers_cursor.walk_range(range)?,
91 );
92
93 let mut last_pruned_block: Option<u64> = None;
94 let mut pruned = 0;
95 for res in tables_iter {
96 let HeaderTablesIterItem { pruned_block, entries_pruned } = res?;
97 last_pruned_block = Some(pruned_block);
98 pruned += entries_pruned;
99 }
100
101 let done = last_pruned_block == Some(block_range_end);
102 let progress = limiter.progress(done);
103
104 Ok(SegmentOutput {
105 progress,
106 pruned,
107 checkpoint: Some(SegmentOutputCheckpoint {
108 block_number: last_pruned_block,
109 tx_number: None,
110 }),
111 })
112 }
113}
114type Walker<'a, Provider, T> =
115 RangeWalker<'a, T, <<Provider as DBProvider>::Tx as DbTxMut>::CursorMut<T>>;
116
117#[allow(missing_debug_implementations)]
118struct HeaderTablesIter<'a, Provider>
119where
120 Provider: StaticFileProviderFactory<Primitives: NodePrimitives<BlockHeader: Value>>
121 + DBProvider<Tx: DbTxMut>,
122{
123 provider: &'a Provider,
124 limiter: &'a mut PruneLimiter,
125 headers_walker: Walker<
126 'a,
127 Provider,
128 tables::Headers<<Provider::Primitives as NodePrimitives>::BlockHeader>,
129 >,
130 header_tds_walker: Walker<'a, Provider, tables::HeaderTerminalDifficulties>,
131 canonical_headers_walker: Walker<'a, Provider, tables::CanonicalHeaders>,
132}
133
134struct HeaderTablesIterItem {
135 pruned_block: BlockNumber,
136 entries_pruned: usize,
137}
138
139impl<'a, Provider> HeaderTablesIter<'a, Provider>
140where
141 Provider: StaticFileProviderFactory<Primitives: NodePrimitives<BlockHeader: Value>>
142 + DBProvider<Tx: DbTxMut>,
143{
144 const fn new(
145 provider: &'a Provider,
146 limiter: &'a mut PruneLimiter,
147 headers_walker: Walker<
148 'a,
149 Provider,
150 tables::Headers<<Provider::Primitives as NodePrimitives>::BlockHeader>,
151 >,
152 header_tds_walker: Walker<'a, Provider, tables::HeaderTerminalDifficulties>,
153 canonical_headers_walker: Walker<'a, Provider, tables::CanonicalHeaders>,
154 ) -> Self {
155 Self { provider, limiter, headers_walker, header_tds_walker, canonical_headers_walker }
156 }
157}
158
159impl<Provider> Iterator for HeaderTablesIter<'_, Provider>
160where
161 Provider: StaticFileProviderFactory<Primitives: NodePrimitives<BlockHeader: Value>>
162 + DBProvider<Tx: DbTxMut>,
163{
164 type Item = Result<HeaderTablesIterItem, PrunerError>;
165 fn next(&mut self) -> Option<Self::Item> {
166 if self.limiter.is_limit_reached() {
167 return None
168 }
169
170 let mut pruned_block_headers = None;
171 let mut pruned_block_td = None;
172 let mut pruned_block_canonical = None;
173
174 if let Err(err) = self.provider.tx_ref().prune_table_with_range_step(
175 &mut self.headers_walker,
176 self.limiter,
177 &mut |_| false,
178 &mut |row| pruned_block_headers = Some(row.0),
179 ) {
180 return Some(Err(err.into()))
181 }
182
183 if let Err(err) = self.provider.tx_ref().prune_table_with_range_step(
184 &mut self.header_tds_walker,
185 self.limiter,
186 &mut |_| false,
187 &mut |row| pruned_block_td = Some(row.0),
188 ) {
189 return Some(Err(err.into()))
190 }
191
192 if let Err(err) = self.provider.tx_ref().prune_table_with_range_step(
193 &mut self.canonical_headers_walker,
194 self.limiter,
195 &mut |_| false,
196 &mut |row| pruned_block_canonical = Some(row.0),
197 ) {
198 return Some(Err(err.into()))
199 }
200
201 if ![pruned_block_headers, pruned_block_td, pruned_block_canonical].iter().all_equal() {
202 return Some(Err(PrunerError::InconsistentData(
203 "All headers-related tables should be pruned up to the same height",
204 )))
205 }
206
207 pruned_block_headers.map(move |block| {
208 Ok(HeaderTablesIterItem { pruned_block: block, entries_pruned: HEADER_TABLES_TO_PRUNE })
209 })
210 }
211}
212
213#[cfg(test)]
214mod tests {
215 use crate::segments::{
216 static_file::headers::HEADER_TABLES_TO_PRUNE, PruneInput, PruneLimiter, Segment,
217 SegmentOutput,
218 };
219 use alloy_primitives::{BlockNumber, B256, U256};
220 use assert_matches::assert_matches;
221 use reth_db_api::{tables, transaction::DbTx};
222 use reth_provider::{
223 DatabaseProviderFactory, PruneCheckpointReader, PruneCheckpointWriter,
224 StaticFileProviderFactory,
225 };
226 use reth_prune_types::{
227 PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
228 SegmentOutputCheckpoint,
229 };
230 use reth_stages::test_utils::TestStageDB;
231 use reth_testing_utils::{generators, generators::random_header_range};
232 use tracing::trace;
233
234 #[test]
235 fn prune() {
236 reth_tracing::init_test_tracing();
237
238 let db = TestStageDB::default();
239 let mut rng = generators::rng();
240
241 let headers = random_header_range(&mut rng, 0..100, B256::ZERO);
242 let tx = db.factory.provider_rw().unwrap().into_tx();
243 for header in &headers {
244 TestStageDB::insert_header(None, &tx, header, U256::ZERO).unwrap();
245 }
246 tx.commit().unwrap();
247
248 assert_eq!(db.table::<tables::CanonicalHeaders>().unwrap().len(), headers.len());
249 assert_eq!(db.table::<tables::Headers>().unwrap().len(), headers.len());
250 assert_eq!(db.table::<tables::HeaderTerminalDifficulties>().unwrap().len(), headers.len());
251
252 let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| {
253 let segment = super::Headers::new(db.factory.static_file_provider());
254 let prune_mode = PruneMode::Before(to_block);
255 let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10);
256 let input = PruneInput {
257 previous_checkpoint: db
258 .factory
259 .provider()
260 .unwrap()
261 .get_prune_checkpoint(PruneSegment::Headers)
262 .unwrap(),
263 to_block,
264 limiter: limiter.clone(),
265 };
266
267 let next_block_number_to_prune = db
268 .factory
269 .provider()
270 .unwrap()
271 .get_prune_checkpoint(PruneSegment::Headers)
272 .unwrap()
273 .and_then(|checkpoint| checkpoint.block_number)
274 .map(|block_number| block_number + 1)
275 .unwrap_or_default();
276
277 let provider = db.factory.database_provider_rw().unwrap();
278 let result = segment.prune(&provider, input.clone()).unwrap();
279 limiter.increment_deleted_entries_count_by(result.pruned);
280 trace!(target: "pruner::test",
281 expected_prune_progress=?expected_result.0,
282 expected_pruned=?expected_result.1,
283 result=?result,
284 "SegmentOutput"
285 );
286
287 assert_matches!(
288 result,
289 SegmentOutput {progress, pruned, checkpoint: Some(_)}
290 if (progress, pruned) == expected_result
291 );
292 provider
293 .save_prune_checkpoint(
294 PruneSegment::Headers,
295 result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
296 )
297 .unwrap();
298 provider.commit().expect("commit");
299
300 let last_pruned_block_number = to_block.min(
301 next_block_number_to_prune +
302 (input.limiter.deleted_entries_limit().unwrap() / HEADER_TABLES_TO_PRUNE - 1)
303 as u64,
304 );
305
306 assert_eq!(
307 db.table::<tables::CanonicalHeaders>().unwrap().len(),
308 headers.len() - (last_pruned_block_number + 1) as usize
309 );
310 assert_eq!(
311 db.table::<tables::Headers>().unwrap().len(),
312 headers.len() - (last_pruned_block_number + 1) as usize
313 );
314 assert_eq!(
315 db.table::<tables::HeaderTerminalDifficulties>().unwrap().len(),
316 headers.len() - (last_pruned_block_number + 1) as usize
317 );
318 assert_eq!(
319 db.factory.provider().unwrap().get_prune_checkpoint(PruneSegment::Headers).unwrap(),
320 Some(PruneCheckpoint {
321 block_number: Some(last_pruned_block_number),
322 tx_number: None,
323 prune_mode
324 })
325 );
326 };
327
328 test_prune(
329 3,
330 (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 9),
331 );
332 test_prune(3, (PruneProgress::Finished, 3));
333 }
334
335 #[test]
336 fn prune_cannot_be_done() {
337 let db = TestStageDB::default();
338
339 let limiter = PruneLimiter::default().set_deleted_entries_limit(0);
340
341 let input = PruneInput {
342 previous_checkpoint: None,
343 to_block: 1,
344 limiter,
346 };
347
348 let provider = db.factory.database_provider_rw().unwrap();
349 let segment = super::Headers::new(db.factory.static_file_provider());
350 let result = segment.prune(&provider, input).unwrap();
351 assert_eq!(
352 result,
353 SegmentOutput::not_done(
354 PruneInterruptReason::DeletedEntriesLimitReached,
355 Some(SegmentOutputCheckpoint::default())
356 )
357 );
358 }
359}