reth_prune/segments/static_file/
headers.rs

1use crate::{
2    db_ext::DbTxPruneExt,
3    segments::{PruneInput, Segment},
4    PruneLimiter, PrunerError,
5};
6use alloy_primitives::BlockNumber;
7use itertools::Itertools;
8use reth_db_api::{
9    cursor::{DbCursorRO, RangeWalker},
10    table::Value,
11    tables,
12    transaction::DbTxMut,
13};
14use reth_primitives_traits::NodePrimitives;
15use reth_provider::{providers::StaticFileProvider, DBProvider, StaticFileProviderFactory};
16use reth_prune_types::{
17    PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
18};
19use reth_static_file_types::StaticFileSegment;
20use std::num::NonZeroUsize;
21use tracing::trace;
22
23/// Number of header tables to prune in one step
24const HEADER_TABLES_TO_PRUNE: usize = 3;
25
26#[derive(Debug)]
27pub struct Headers<N> {
28    static_file_provider: StaticFileProvider<N>,
29}
30
31impl<N> Headers<N> {
32    pub const fn new(static_file_provider: StaticFileProvider<N>) -> Self {
33        Self { static_file_provider }
34    }
35}
36
37impl<Provider> Segment<Provider> for Headers<Provider::Primitives>
38where
39    Provider: StaticFileProviderFactory<Primitives: NodePrimitives<BlockHeader: Value>>
40        + DBProvider<Tx: DbTxMut>,
41{
42    fn segment(&self) -> PruneSegment {
43        PruneSegment::Headers
44    }
45
46    fn mode(&self) -> Option<PruneMode> {
47        self.static_file_provider
48            .get_highest_static_file_block(StaticFileSegment::Headers)
49            .map(PruneMode::before_inclusive)
50    }
51
52    fn purpose(&self) -> PrunePurpose {
53        PrunePurpose::StaticFile
54    }
55
56    fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
57        let (block_range_start, block_range_end) = match input.get_next_block_range() {
58            Some(range) => (*range.start(), *range.end()),
59            None => {
60                trace!(target: "pruner", "No headers to prune");
61                return Ok(SegmentOutput::done())
62            }
63        };
64
65        let last_pruned_block =
66            if block_range_start == 0 { None } else { Some(block_range_start - 1) };
67
68        let range = last_pruned_block.map_or(0, |block| block + 1)..=block_range_end;
69
70        // let mut headers_cursor = provider.tx_ref().cursor_write::<tables::Headers>()?;
71        let mut headers_cursor = provider
72            .tx_ref()
73            .cursor_write::<tables::Headers<<Provider::Primitives as NodePrimitives>::BlockHeader>>(
74            )?;
75
76        let mut header_tds_cursor =
77            provider.tx_ref().cursor_write::<tables::HeaderTerminalDifficulties>()?;
78        let mut canonical_headers_cursor =
79            provider.tx_ref().cursor_write::<tables::CanonicalHeaders>()?;
80
81        let mut limiter = input.limiter.floor_deleted_entries_limit_to_multiple_of(
82            NonZeroUsize::new(HEADER_TABLES_TO_PRUNE).unwrap(),
83        );
84
85        let tables_iter = HeaderTablesIter::new(
86            provider,
87            &mut limiter,
88            headers_cursor.walk_range(range.clone())?,
89            header_tds_cursor.walk_range(range.clone())?,
90            canonical_headers_cursor.walk_range(range)?,
91        );
92
93        let mut last_pruned_block: Option<u64> = None;
94        let mut pruned = 0;
95        for res in tables_iter {
96            let HeaderTablesIterItem { pruned_block, entries_pruned } = res?;
97            last_pruned_block = Some(pruned_block);
98            pruned += entries_pruned;
99        }
100
101        let done = last_pruned_block == Some(block_range_end);
102        let progress = limiter.progress(done);
103
104        Ok(SegmentOutput {
105            progress,
106            pruned,
107            checkpoint: Some(SegmentOutputCheckpoint {
108                block_number: last_pruned_block,
109                tx_number: None,
110            }),
111        })
112    }
113}
114type Walker<'a, Provider, T> =
115    RangeWalker<'a, T, <<Provider as DBProvider>::Tx as DbTxMut>::CursorMut<T>>;
116
117#[allow(missing_debug_implementations)]
118struct HeaderTablesIter<'a, Provider>
119where
120    Provider: StaticFileProviderFactory<Primitives: NodePrimitives<BlockHeader: Value>>
121        + DBProvider<Tx: DbTxMut>,
122{
123    provider: &'a Provider,
124    limiter: &'a mut PruneLimiter,
125    headers_walker: Walker<
126        'a,
127        Provider,
128        tables::Headers<<Provider::Primitives as NodePrimitives>::BlockHeader>,
129    >,
130    header_tds_walker: Walker<'a, Provider, tables::HeaderTerminalDifficulties>,
131    canonical_headers_walker: Walker<'a, Provider, tables::CanonicalHeaders>,
132}
133
134struct HeaderTablesIterItem {
135    pruned_block: BlockNumber,
136    entries_pruned: usize,
137}
138
139impl<'a, Provider> HeaderTablesIter<'a, Provider>
140where
141    Provider: StaticFileProviderFactory<Primitives: NodePrimitives<BlockHeader: Value>>
142        + DBProvider<Tx: DbTxMut>,
143{
144    const fn new(
145        provider: &'a Provider,
146        limiter: &'a mut PruneLimiter,
147        headers_walker: Walker<
148            'a,
149            Provider,
150            tables::Headers<<Provider::Primitives as NodePrimitives>::BlockHeader>,
151        >,
152        header_tds_walker: Walker<'a, Provider, tables::HeaderTerminalDifficulties>,
153        canonical_headers_walker: Walker<'a, Provider, tables::CanonicalHeaders>,
154    ) -> Self {
155        Self { provider, limiter, headers_walker, header_tds_walker, canonical_headers_walker }
156    }
157}
158
159impl<Provider> Iterator for HeaderTablesIter<'_, Provider>
160where
161    Provider: StaticFileProviderFactory<Primitives: NodePrimitives<BlockHeader: Value>>
162        + DBProvider<Tx: DbTxMut>,
163{
164    type Item = Result<HeaderTablesIterItem, PrunerError>;
165    fn next(&mut self) -> Option<Self::Item> {
166        if self.limiter.is_limit_reached() {
167            return None
168        }
169
170        let mut pruned_block_headers = None;
171        let mut pruned_block_td = None;
172        let mut pruned_block_canonical = None;
173
174        if let Err(err) = self.provider.tx_ref().prune_table_with_range_step(
175            &mut self.headers_walker,
176            self.limiter,
177            &mut |_| false,
178            &mut |row| pruned_block_headers = Some(row.0),
179        ) {
180            return Some(Err(err.into()))
181        }
182
183        if let Err(err) = self.provider.tx_ref().prune_table_with_range_step(
184            &mut self.header_tds_walker,
185            self.limiter,
186            &mut |_| false,
187            &mut |row| pruned_block_td = Some(row.0),
188        ) {
189            return Some(Err(err.into()))
190        }
191
192        if let Err(err) = self.provider.tx_ref().prune_table_with_range_step(
193            &mut self.canonical_headers_walker,
194            self.limiter,
195            &mut |_| false,
196            &mut |row| pruned_block_canonical = Some(row.0),
197        ) {
198            return Some(Err(err.into()))
199        }
200
201        if ![pruned_block_headers, pruned_block_td, pruned_block_canonical].iter().all_equal() {
202            return Some(Err(PrunerError::InconsistentData(
203                "All headers-related tables should be pruned up to the same height",
204            )))
205        }
206
207        pruned_block_headers.map(move |block| {
208            Ok(HeaderTablesIterItem { pruned_block: block, entries_pruned: HEADER_TABLES_TO_PRUNE })
209        })
210    }
211}
212
213#[cfg(test)]
214mod tests {
215    use crate::segments::{
216        static_file::headers::HEADER_TABLES_TO_PRUNE, PruneInput, PruneLimiter, Segment,
217        SegmentOutput,
218    };
219    use alloy_primitives::{BlockNumber, B256, U256};
220    use assert_matches::assert_matches;
221    use reth_db_api::{tables, transaction::DbTx};
222    use reth_provider::{
223        DatabaseProviderFactory, PruneCheckpointReader, PruneCheckpointWriter,
224        StaticFileProviderFactory,
225    };
226    use reth_prune_types::{
227        PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
228        SegmentOutputCheckpoint,
229    };
230    use reth_stages::test_utils::TestStageDB;
231    use reth_testing_utils::{generators, generators::random_header_range};
232    use tracing::trace;
233
234    #[test]
235    fn prune() {
236        reth_tracing::init_test_tracing();
237
238        let db = TestStageDB::default();
239        let mut rng = generators::rng();
240
241        let headers = random_header_range(&mut rng, 0..100, B256::ZERO);
242        let tx = db.factory.provider_rw().unwrap().into_tx();
243        for header in &headers {
244            TestStageDB::insert_header(None, &tx, header, U256::ZERO).unwrap();
245        }
246        tx.commit().unwrap();
247
248        assert_eq!(db.table::<tables::CanonicalHeaders>().unwrap().len(), headers.len());
249        assert_eq!(db.table::<tables::Headers>().unwrap().len(), headers.len());
250        assert_eq!(db.table::<tables::HeaderTerminalDifficulties>().unwrap().len(), headers.len());
251
252        let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| {
253            let segment = super::Headers::new(db.factory.static_file_provider());
254            let prune_mode = PruneMode::Before(to_block);
255            let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10);
256            let input = PruneInput {
257                previous_checkpoint: db
258                    .factory
259                    .provider()
260                    .unwrap()
261                    .get_prune_checkpoint(PruneSegment::Headers)
262                    .unwrap(),
263                to_block,
264                limiter: limiter.clone(),
265            };
266
267            let next_block_number_to_prune = db
268                .factory
269                .provider()
270                .unwrap()
271                .get_prune_checkpoint(PruneSegment::Headers)
272                .unwrap()
273                .and_then(|checkpoint| checkpoint.block_number)
274                .map(|block_number| block_number + 1)
275                .unwrap_or_default();
276
277            let provider = db.factory.database_provider_rw().unwrap();
278            let result = segment.prune(&provider, input.clone()).unwrap();
279            limiter.increment_deleted_entries_count_by(result.pruned);
280            trace!(target: "pruner::test",
281                expected_prune_progress=?expected_result.0,
282                expected_pruned=?expected_result.1,
283                result=?result,
284                "SegmentOutput"
285            );
286
287            assert_matches!(
288                result,
289                SegmentOutput {progress, pruned, checkpoint: Some(_)}
290                    if (progress, pruned) == expected_result
291            );
292            provider
293                .save_prune_checkpoint(
294                    PruneSegment::Headers,
295                    result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
296                )
297                .unwrap();
298            provider.commit().expect("commit");
299
300            let last_pruned_block_number = to_block.min(
301                next_block_number_to_prune +
302                    (input.limiter.deleted_entries_limit().unwrap() / HEADER_TABLES_TO_PRUNE - 1)
303                        as u64,
304            );
305
306            assert_eq!(
307                db.table::<tables::CanonicalHeaders>().unwrap().len(),
308                headers.len() - (last_pruned_block_number + 1) as usize
309            );
310            assert_eq!(
311                db.table::<tables::Headers>().unwrap().len(),
312                headers.len() - (last_pruned_block_number + 1) as usize
313            );
314            assert_eq!(
315                db.table::<tables::HeaderTerminalDifficulties>().unwrap().len(),
316                headers.len() - (last_pruned_block_number + 1) as usize
317            );
318            assert_eq!(
319                db.factory.provider().unwrap().get_prune_checkpoint(PruneSegment::Headers).unwrap(),
320                Some(PruneCheckpoint {
321                    block_number: Some(last_pruned_block_number),
322                    tx_number: None,
323                    prune_mode
324                })
325            );
326        };
327
328        test_prune(
329            3,
330            (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 9),
331        );
332        test_prune(3, (PruneProgress::Finished, 3));
333    }
334
335    #[test]
336    fn prune_cannot_be_done() {
337        let db = TestStageDB::default();
338
339        let limiter = PruneLimiter::default().set_deleted_entries_limit(0);
340
341        let input = PruneInput {
342            previous_checkpoint: None,
343            to_block: 1,
344            // Less than total number of tables for `Headers` segment
345            limiter,
346        };
347
348        let provider = db.factory.database_provider_rw().unwrap();
349        let segment = super::Headers::new(db.factory.static_file_provider());
350        let result = segment.prune(&provider, input).unwrap();
351        assert_eq!(
352            result,
353            SegmentOutput::not_done(
354                PruneInterruptReason::DeletedEntriesLimitReached,
355                Some(SegmentOutputCheckpoint::default())
356            )
357        );
358    }
359}