reth_prune/segments/static_file/
headers.rs

1use crate::{
2    db_ext::DbTxPruneExt,
3    segments::{PruneInput, Segment},
4    PruneLimiter, PrunerError,
5};
6use alloy_primitives::BlockNumber;
7use itertools::Itertools;
8use reth_db_api::{
9    cursor::{DbCursorRO, RangeWalker},
10    tables,
11    transaction::DbTxMut,
12};
13use reth_provider::{providers::StaticFileProvider, DBProvider, StaticFileProviderFactory};
14use reth_prune_types::{
15    PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
16};
17use reth_static_file_types::StaticFileSegment;
18use std::num::NonZeroUsize;
19use tracing::trace;
20
21/// Number of header tables to prune in one step
22const HEADER_TABLES_TO_PRUNE: usize = 3;
23
24#[derive(Debug)]
25pub struct Headers<N> {
26    static_file_provider: StaticFileProvider<N>,
27}
28
29impl<N> Headers<N> {
30    pub const fn new(static_file_provider: StaticFileProvider<N>) -> Self {
31        Self { static_file_provider }
32    }
33}
34
35impl<Provider: StaticFileProviderFactory + DBProvider<Tx: DbTxMut>> Segment<Provider>
36    for Headers<Provider::Primitives>
37{
38    fn segment(&self) -> PruneSegment {
39        PruneSegment::Headers
40    }
41
42    fn mode(&self) -> Option<PruneMode> {
43        self.static_file_provider
44            .get_highest_static_file_block(StaticFileSegment::Headers)
45            .map(PruneMode::before_inclusive)
46    }
47
48    fn purpose(&self) -> PrunePurpose {
49        PrunePurpose::StaticFile
50    }
51
52    fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
53        let (block_range_start, block_range_end) = match input.get_next_block_range() {
54            Some(range) => (*range.start(), *range.end()),
55            None => {
56                trace!(target: "pruner", "No headers to prune");
57                return Ok(SegmentOutput::done())
58            }
59        };
60
61        let last_pruned_block =
62            if block_range_start == 0 { None } else { Some(block_range_start - 1) };
63
64        let range = last_pruned_block.map_or(0, |block| block + 1)..=block_range_end;
65
66        let mut headers_cursor = provider.tx_ref().cursor_write::<tables::Headers>()?;
67        let mut header_tds_cursor =
68            provider.tx_ref().cursor_write::<tables::HeaderTerminalDifficulties>()?;
69        let mut canonical_headers_cursor =
70            provider.tx_ref().cursor_write::<tables::CanonicalHeaders>()?;
71
72        let mut limiter = input.limiter.floor_deleted_entries_limit_to_multiple_of(
73            NonZeroUsize::new(HEADER_TABLES_TO_PRUNE).unwrap(),
74        );
75
76        let tables_iter = HeaderTablesIter::new(
77            provider,
78            &mut limiter,
79            headers_cursor.walk_range(range.clone())?,
80            header_tds_cursor.walk_range(range.clone())?,
81            canonical_headers_cursor.walk_range(range)?,
82        );
83
84        let mut last_pruned_block: Option<u64> = None;
85        let mut pruned = 0;
86        for res in tables_iter {
87            let HeaderTablesIterItem { pruned_block, entries_pruned } = res?;
88            last_pruned_block = Some(pruned_block);
89            pruned += entries_pruned;
90        }
91
92        let done = last_pruned_block == Some(block_range_end);
93        let progress = limiter.progress(done);
94
95        Ok(SegmentOutput {
96            progress,
97            pruned,
98            checkpoint: Some(SegmentOutputCheckpoint {
99                block_number: last_pruned_block,
100                tx_number: None,
101            }),
102        })
103    }
104}
105type Walker<'a, Provider, T> =
106    RangeWalker<'a, T, <<Provider as DBProvider>::Tx as DbTxMut>::CursorMut<T>>;
107
108#[allow(missing_debug_implementations)]
109struct HeaderTablesIter<'a, Provider>
110where
111    Provider: DBProvider<Tx: DbTxMut>,
112{
113    provider: &'a Provider,
114    limiter: &'a mut PruneLimiter,
115    headers_walker: Walker<'a, Provider, tables::Headers>,
116    header_tds_walker: Walker<'a, Provider, tables::HeaderTerminalDifficulties>,
117    canonical_headers_walker: Walker<'a, Provider, tables::CanonicalHeaders>,
118}
119
120struct HeaderTablesIterItem {
121    pruned_block: BlockNumber,
122    entries_pruned: usize,
123}
124
125impl<'a, Provider> HeaderTablesIter<'a, Provider>
126where
127    Provider: DBProvider<Tx: DbTxMut>,
128{
129    const fn new(
130        provider: &'a Provider,
131        limiter: &'a mut PruneLimiter,
132        headers_walker: Walker<'a, Provider, tables::Headers>,
133        header_tds_walker: Walker<'a, Provider, tables::HeaderTerminalDifficulties>,
134        canonical_headers_walker: Walker<'a, Provider, tables::CanonicalHeaders>,
135    ) -> Self {
136        Self { provider, limiter, headers_walker, header_tds_walker, canonical_headers_walker }
137    }
138}
139
140impl<Provider> Iterator for HeaderTablesIter<'_, Provider>
141where
142    Provider: DBProvider<Tx: DbTxMut>,
143{
144    type Item = Result<HeaderTablesIterItem, PrunerError>;
145    fn next(&mut self) -> Option<Self::Item> {
146        if self.limiter.is_limit_reached() {
147            return None
148        }
149
150        let mut pruned_block_headers = None;
151        let mut pruned_block_td = None;
152        let mut pruned_block_canonical = None;
153
154        if let Err(err) = self.provider.tx_ref().prune_table_with_range_step(
155            &mut self.headers_walker,
156            self.limiter,
157            &mut |_| false,
158            &mut |row| pruned_block_headers = Some(row.0),
159        ) {
160            return Some(Err(err.into()))
161        }
162
163        if let Err(err) = self.provider.tx_ref().prune_table_with_range_step(
164            &mut self.header_tds_walker,
165            self.limiter,
166            &mut |_| false,
167            &mut |row| pruned_block_td = Some(row.0),
168        ) {
169            return Some(Err(err.into()))
170        }
171
172        if let Err(err) = self.provider.tx_ref().prune_table_with_range_step(
173            &mut self.canonical_headers_walker,
174            self.limiter,
175            &mut |_| false,
176            &mut |row| pruned_block_canonical = Some(row.0),
177        ) {
178            return Some(Err(err.into()))
179        }
180
181        if ![pruned_block_headers, pruned_block_td, pruned_block_canonical].iter().all_equal() {
182            return Some(Err(PrunerError::InconsistentData(
183                "All headers-related tables should be pruned up to the same height",
184            )))
185        }
186
187        pruned_block_headers.map(move |block| {
188            Ok(HeaderTablesIterItem { pruned_block: block, entries_pruned: HEADER_TABLES_TO_PRUNE })
189        })
190    }
191}
192
193#[cfg(test)]
194mod tests {
195    use crate::segments::{
196        static_file::headers::HEADER_TABLES_TO_PRUNE, PruneInput, PruneLimiter, Segment,
197        SegmentOutput,
198    };
199    use alloy_primitives::{BlockNumber, B256, U256};
200    use assert_matches::assert_matches;
201    use reth_db_api::{tables, transaction::DbTx};
202    use reth_provider::{
203        DatabaseProviderFactory, PruneCheckpointReader, PruneCheckpointWriter,
204        StaticFileProviderFactory,
205    };
206    use reth_prune_types::{
207        PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
208        SegmentOutputCheckpoint,
209    };
210    use reth_stages::test_utils::TestStageDB;
211    use reth_testing_utils::{generators, generators::random_header_range};
212    use tracing::trace;
213
214    #[test]
215    fn prune() {
216        reth_tracing::init_test_tracing();
217
218        let db = TestStageDB::default();
219        let mut rng = generators::rng();
220
221        let headers = random_header_range(&mut rng, 0..100, B256::ZERO);
222        let tx = db.factory.provider_rw().unwrap().into_tx();
223        for header in &headers {
224            TestStageDB::insert_header(None, &tx, header, U256::ZERO).unwrap();
225        }
226        tx.commit().unwrap();
227
228        assert_eq!(db.table::<tables::CanonicalHeaders>().unwrap().len(), headers.len());
229        assert_eq!(db.table::<tables::Headers>().unwrap().len(), headers.len());
230        assert_eq!(db.table::<tables::HeaderTerminalDifficulties>().unwrap().len(), headers.len());
231
232        let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| {
233            let segment = super::Headers::new(db.factory.static_file_provider());
234            let prune_mode = PruneMode::Before(to_block);
235            let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10);
236            let input = PruneInput {
237                previous_checkpoint: db
238                    .factory
239                    .provider()
240                    .unwrap()
241                    .get_prune_checkpoint(PruneSegment::Headers)
242                    .unwrap(),
243                to_block,
244                limiter: limiter.clone(),
245            };
246
247            let next_block_number_to_prune = db
248                .factory
249                .provider()
250                .unwrap()
251                .get_prune_checkpoint(PruneSegment::Headers)
252                .unwrap()
253                .and_then(|checkpoint| checkpoint.block_number)
254                .map(|block_number| block_number + 1)
255                .unwrap_or_default();
256
257            let provider = db.factory.database_provider_rw().unwrap();
258            let result = segment.prune(&provider, input.clone()).unwrap();
259            limiter.increment_deleted_entries_count_by(result.pruned);
260            trace!(target: "pruner::test",
261                expected_prune_progress=?expected_result.0,
262                expected_pruned=?expected_result.1,
263                result=?result,
264                "SegmentOutput"
265            );
266
267            assert_matches!(
268                result,
269                SegmentOutput {progress, pruned, checkpoint: Some(_)}
270                    if (progress, pruned) == expected_result
271            );
272            provider
273                .save_prune_checkpoint(
274                    PruneSegment::Headers,
275                    result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
276                )
277                .unwrap();
278            provider.commit().expect("commit");
279
280            let last_pruned_block_number = to_block.min(
281                next_block_number_to_prune +
282                    (input.limiter.deleted_entries_limit().unwrap() / HEADER_TABLES_TO_PRUNE - 1)
283                        as u64,
284            );
285
286            assert_eq!(
287                db.table::<tables::CanonicalHeaders>().unwrap().len(),
288                headers.len() - (last_pruned_block_number + 1) as usize
289            );
290            assert_eq!(
291                db.table::<tables::Headers>().unwrap().len(),
292                headers.len() - (last_pruned_block_number + 1) as usize
293            );
294            assert_eq!(
295                db.table::<tables::HeaderTerminalDifficulties>().unwrap().len(),
296                headers.len() - (last_pruned_block_number + 1) as usize
297            );
298            assert_eq!(
299                db.factory.provider().unwrap().get_prune_checkpoint(PruneSegment::Headers).unwrap(),
300                Some(PruneCheckpoint {
301                    block_number: Some(last_pruned_block_number),
302                    tx_number: None,
303                    prune_mode
304                })
305            );
306        };
307
308        test_prune(
309            3,
310            (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 9),
311        );
312        test_prune(3, (PruneProgress::Finished, 3));
313    }
314
315    #[test]
316    fn prune_cannot_be_done() {
317        let db = TestStageDB::default();
318
319        let limiter = PruneLimiter::default().set_deleted_entries_limit(0);
320
321        let input = PruneInput {
322            previous_checkpoint: None,
323            to_block: 1,
324            // Less than total number of tables for `Headers` segment
325            limiter,
326        };
327
328        let provider = db.factory.database_provider_rw().unwrap();
329        let segment = super::Headers::new(db.factory.static_file_provider());
330        let result = segment.prune(&provider, input).unwrap();
331        assert_eq!(
332            result,
333            SegmentOutput::not_done(
334                PruneInterruptReason::DeletedEntriesLimitReached,
335                Some(SegmentOutputCheckpoint::default())
336            )
337        );
338    }
339}