reth_prune/
pruner.rs

1//! Support for pruning.
2
3use crate::{
4    segments::{PruneInput, Segment},
5    Metrics, PruneLimiter, PrunerError, PrunerEvent,
6};
7use alloy_primitives::BlockNumber;
8use reth_exex_types::FinishedExExHeight;
9use reth_provider::{
10    DBProvider, DatabaseProviderFactory, PruneCheckpointReader, PruneCheckpointWriter,
11    StageCheckpointReader,
12};
13use reth_prune_types::{PruneProgress, PrunedSegmentInfo, PrunerOutput};
14use reth_stages_types::StageId;
15use reth_tokio_util::{EventSender, EventStream};
16use std::time::{Duration, Instant};
17use tokio::sync::watch;
18use tracing::debug;
19
20/// Result of [`Pruner::run`] execution.
21pub type PrunerResult = Result<PrunerOutput, PrunerError>;
22
23/// The pruner type itself with the result of [`Pruner::run`]
24pub type PrunerWithResult<S, DB> = (Pruner<S, DB>, PrunerResult);
25
26/// Pruner with preset provider factory.
27pub type PrunerWithFactory<PF> = Pruner<<PF as DatabaseProviderFactory>::ProviderRW, PF>;
28
29/// Pruning routine. Main pruning logic happens in [`Pruner::run`].
30#[derive(Debug)]
31pub struct Pruner<Provider, PF> {
32    /// Provider factory. If pruner is initialized without it, it will be set to `()`.
33    provider_factory: PF,
34    segments: Vec<Box<dyn Segment<Provider>>>,
35    /// Minimum pruning interval measured in blocks. All prune segments are checked and, if needed,
36    /// pruned, when the chain advances by the specified number of blocks.
37    min_block_interval: usize,
38    /// Previous tip block number when the pruner was run. Even if no data was pruned, this block
39    /// number is updated with the tip block number the pruner was called with. It's used in
40    /// conjunction with `min_block_interval` to determine when the pruning needs to be initiated.
41    previous_tip_block_number: Option<BlockNumber>,
42    /// Maximum total entries to prune (delete from database) per run.
43    delete_limit: usize,
44    /// Maximum time for one pruner run.
45    timeout: Option<Duration>,
46    /// The finished height of all `ExEx`'s.
47    finished_exex_height: watch::Receiver<FinishedExExHeight>,
48    #[doc(hidden)]
49    metrics: Metrics,
50    event_sender: EventSender<PrunerEvent>,
51}
52
53impl<Provider> Pruner<Provider, ()> {
54    /// Creates a new [Pruner] without a provider factory.
55    pub fn new(
56        segments: Vec<Box<dyn Segment<Provider>>>,
57        min_block_interval: usize,
58        delete_limit: usize,
59        timeout: Option<Duration>,
60        finished_exex_height: watch::Receiver<FinishedExExHeight>,
61    ) -> Self {
62        Self {
63            provider_factory: (),
64            segments,
65            min_block_interval,
66            previous_tip_block_number: None,
67            delete_limit,
68            timeout,
69            finished_exex_height,
70            metrics: Metrics::default(),
71            event_sender: Default::default(),
72        }
73    }
74}
75
76impl<PF> Pruner<PF::ProviderRW, PF>
77where
78    PF: DatabaseProviderFactory,
79{
80    /// Crates a new pruner with the given provider factory.
81    pub fn new_with_factory(
82        provider_factory: PF,
83        segments: Vec<Box<dyn Segment<PF::ProviderRW>>>,
84        min_block_interval: usize,
85        delete_limit: usize,
86        timeout: Option<Duration>,
87        finished_exex_height: watch::Receiver<FinishedExExHeight>,
88    ) -> Self {
89        Self {
90            provider_factory,
91            segments,
92            min_block_interval,
93            previous_tip_block_number: None,
94            delete_limit,
95            timeout,
96            finished_exex_height,
97            metrics: Metrics::default(),
98            event_sender: Default::default(),
99        }
100    }
101}
102
103impl<Provider, S> Pruner<Provider, S>
104where
105    Provider: PruneCheckpointReader + PruneCheckpointWriter + StageCheckpointReader,
106{
107    /// Listen for events on the pruner.
108    pub fn events(&self) -> EventStream<PrunerEvent> {
109        self.event_sender.new_listener()
110    }
111
112    /// Run the pruner with the given provider. This will only prune data up to the highest finished
113    /// `ExEx` height, if there are no `ExExes`.
114    ///
115    /// Returns a [`PruneProgress`], indicating whether pruning is finished, or there is more data
116    /// to prune.
117    pub fn run_with_provider(
118        &mut self,
119        provider: &Provider,
120        tip_block_number: BlockNumber,
121    ) -> PrunerResult {
122        let Some(tip_block_number) =
123            self.adjust_tip_block_number_to_finished_exex_height(tip_block_number)
124        else {
125            return Ok(PruneProgress::Finished.into())
126        };
127        if tip_block_number == 0 {
128            self.previous_tip_block_number = Some(tip_block_number);
129
130            debug!(target: "pruner", %tip_block_number, "Nothing to prune yet");
131            return Ok(PruneProgress::Finished.into())
132        }
133
134        self.event_sender.notify(PrunerEvent::Started { tip_block_number });
135
136        debug!(target: "pruner", %tip_block_number, "Pruner started");
137        let start = Instant::now();
138
139        let mut limiter = PruneLimiter::default().set_deleted_entries_limit(self.delete_limit);
140        if let Some(timeout) = self.timeout {
141            limiter = limiter.set_time_limit(timeout);
142        };
143
144        let (stats, deleted_entries, output) =
145            self.prune_segments(provider, tip_block_number, &mut limiter)?;
146
147        self.previous_tip_block_number = Some(tip_block_number);
148
149        let elapsed = start.elapsed();
150        self.metrics.duration_seconds.record(elapsed);
151
152        let message = match output.progress {
153            PruneProgress::HasMoreData(_) => "Pruner interrupted and has more data to prune",
154            PruneProgress::Finished => "Pruner finished",
155        };
156
157        debug!(
158            target: "pruner",
159            %tip_block_number,
160            ?elapsed,
161            ?deleted_entries,
162            ?limiter,
163            ?output,
164            ?stats,
165            "{message}",
166        );
167
168        self.event_sender.notify(PrunerEvent::Finished { tip_block_number, elapsed, stats });
169
170        Ok(output)
171    }
172
173    /// Prunes the segments that the [Pruner] was initialized with, and the segments that needs to
174    /// be pruned according to the highest `static_files`. Segments are parts of the database that
175    /// represent one or more tables.
176    ///
177    /// Returns a list of stats per pruned segment, total number of entries pruned, and
178    /// [`PruneProgress`].
179    fn prune_segments(
180        &mut self,
181        provider: &Provider,
182        tip_block_number: BlockNumber,
183        limiter: &mut PruneLimiter,
184    ) -> Result<(Vec<PrunedSegmentInfo>, usize, PrunerOutput), PrunerError> {
185        let mut stats = Vec::with_capacity(self.segments.len());
186        let mut pruned = 0;
187        let mut output = PrunerOutput {
188            progress: PruneProgress::Finished,
189            segments: Vec::with_capacity(self.segments.len()),
190        };
191
192        for segment in &self.segments {
193            if limiter.is_limit_reached() {
194                break
195            }
196
197            if let Some((to_block, prune_mode)) = segment
198                .mode()
199                .map(|mode| {
200                    mode.prune_target_block(tip_block_number, segment.segment(), segment.purpose())
201                })
202                .transpose()?
203                .flatten()
204            {
205                // Check if segment has a required stage that must be finished first
206                if let Some(required_stage) = segment.required_stage() &&
207                    !is_stage_finished(provider, required_stage)?
208                {
209                    debug!(
210                        target: "pruner",
211                        segment = ?segment.segment(),
212                        ?required_stage,
213                        "Segment's required stage not finished, skipping"
214                    );
215                    continue
216                }
217
218                debug!(
219                    target: "pruner",
220                    segment = ?segment.segment(),
221                    purpose = ?segment.purpose(),
222                    %to_block,
223                    ?prune_mode,
224                    "Segment pruning started"
225                );
226
227                let segment_start = Instant::now();
228                let previous_checkpoint = provider.get_prune_checkpoint(segment.segment())?;
229                let segment_output = segment.prune(
230                    provider,
231                    PruneInput { previous_checkpoint, to_block, limiter: limiter.clone() },
232                )?;
233                if let Some(checkpoint) = segment_output.checkpoint {
234                    segment
235                        .save_checkpoint(provider, checkpoint.as_prune_checkpoint(prune_mode))?;
236                }
237                self.metrics
238                    .get_prune_segment_metrics(segment.segment())
239                    .duration_seconds
240                    .record(segment_start.elapsed());
241                if let Some(highest_pruned_block) =
242                    segment_output.checkpoint.and_then(|checkpoint| checkpoint.block_number)
243                {
244                    self.metrics
245                        .get_prune_segment_metrics(segment.segment())
246                        .highest_pruned_block
247                        .set(highest_pruned_block as f64);
248                }
249
250                output.progress = segment_output.progress;
251                output.segments.push((segment.segment(), segment_output));
252
253                debug!(
254                    target: "pruner",
255                    segment = ?segment.segment(),
256                    purpose = ?segment.purpose(),
257                    %to_block,
258                    ?prune_mode,
259                    %segment_output.pruned,
260                    "Segment pruning finished"
261                );
262
263                if segment_output.pruned > 0 {
264                    limiter.increment_deleted_entries_count_by(segment_output.pruned);
265                    pruned += segment_output.pruned;
266                    let info = PrunedSegmentInfo {
267                        segment: segment.segment(),
268                        pruned: segment_output.pruned,
269                        progress: segment_output.progress,
270                    };
271                    stats.push(info);
272                }
273            } else {
274                debug!(target: "pruner", segment = ?segment.segment(), purpose = ?segment.purpose(), "Nothing to prune for the segment");
275            }
276        }
277
278        Ok((stats, pruned, output))
279    }
280
281    /// Returns `true` if the pruning is needed at the provided tip block number.
282    /// This is determined by the check against minimum pruning interval and last pruned block
283    /// number.
284    pub fn is_pruning_needed(&self, tip_block_number: BlockNumber) -> bool {
285        let Some(tip_block_number) =
286            self.adjust_tip_block_number_to_finished_exex_height(tip_block_number)
287        else {
288            return false
289        };
290
291        // Saturating subtraction is needed for the case when the chain was reverted, meaning
292        // current block number might be less than the previous tip block number.
293        // If that's the case, no pruning is needed as outdated data is also reverted.
294        if tip_block_number.saturating_sub(self.previous_tip_block_number.unwrap_or_default()) >=
295            self.min_block_interval as u64
296        {
297            debug!(
298                target: "pruner",
299                previous_tip_block_number = ?self.previous_tip_block_number,
300                %tip_block_number,
301                "Minimum pruning interval reached"
302            );
303            true
304        } else {
305            false
306        }
307    }
308
309    /// Adjusts the tip block number to the finished `ExEx` height. This is needed to not prune more
310    /// data than `ExExs` have processed. Depending on the height:
311    /// - [`FinishedExExHeight::NoExExs`] returns the tip block number as no adjustment for `ExExs`
312    ///   is needed.
313    /// - [`FinishedExExHeight::NotReady`] returns `None` as not all `ExExs` have emitted a
314    ///   `FinishedHeight` event yet.
315    /// - [`FinishedExExHeight::Height`] returns the finished `ExEx` height.
316    fn adjust_tip_block_number_to_finished_exex_height(
317        &self,
318        tip_block_number: BlockNumber,
319    ) -> Option<BlockNumber> {
320        match *self.finished_exex_height.borrow() {
321            FinishedExExHeight::NoExExs => Some(tip_block_number),
322            FinishedExExHeight::NotReady => {
323                debug!(target: "pruner", %tip_block_number, "Not all ExExs have emitted a `FinishedHeight` event yet, can't prune");
324                None
325            }
326            FinishedExExHeight::Height(finished_exex_height) => {
327                debug!(target: "pruner", %tip_block_number, %finished_exex_height, "Adjusting tip block number to the finished ExEx height");
328                Some(finished_exex_height)
329            }
330        }
331    }
332}
333
334impl<PF> Pruner<PF::ProviderRW, PF>
335where
336    PF: DatabaseProviderFactory<
337        ProviderRW: PruneCheckpointWriter + PruneCheckpointReader + StageCheckpointReader,
338    >,
339{
340    /// Run the pruner. This will only prune data up to the highest finished ExEx height, if there
341    /// are no ExExes.
342    ///
343    /// Returns a [`PruneProgress`], indicating whether pruning is finished, or there is more data
344    /// to prune.
345    pub fn run(&mut self, tip_block_number: BlockNumber) -> PrunerResult {
346        let provider = self.provider_factory.database_provider_rw()?;
347        let result = self.run_with_provider(&provider, tip_block_number);
348        provider.commit()?;
349        result
350    }
351}
352
353/// Checks if the given stage has caught up with the `Finish` stage.
354///
355/// Returns `true` if the stage checkpoint is >= the Finish stage checkpoint.
356fn is_stage_finished<Provider: StageCheckpointReader>(
357    provider: &Provider,
358    stage_id: StageId,
359) -> Result<bool, PrunerError> {
360    let stage_checkpoint = provider.get_stage_checkpoint(stage_id)?.map(|c| c.block_number);
361    let finish_checkpoint = provider.get_stage_checkpoint(StageId::Finish)?.map(|c| c.block_number);
362
363    Ok(stage_checkpoint >= finish_checkpoint)
364}
365
366#[cfg(test)]
367mod tests {
368    use crate::Pruner;
369    use reth_exex_types::FinishedExExHeight;
370    use reth_provider::test_utils::create_test_provider_factory;
371
372    #[test]
373    fn is_pruning_needed() {
374        let provider_factory = create_test_provider_factory();
375
376        let (finished_exex_height_tx, finished_exex_height_rx) =
377            tokio::sync::watch::channel(FinishedExExHeight::NoExExs);
378
379        let mut pruner =
380            Pruner::new_with_factory(provider_factory, vec![], 5, 0, None, finished_exex_height_rx);
381
382        // No last pruned block number was set before
383        let first_block_number = 1;
384        assert!(!pruner.is_pruning_needed(first_block_number));
385        pruner.previous_tip_block_number = Some(first_block_number);
386
387        // Tip block number delta is >= than min block interval
388        let second_block_number = first_block_number + pruner.min_block_interval as u64;
389        assert!(pruner.is_pruning_needed(second_block_number));
390        pruner.previous_tip_block_number = Some(second_block_number);
391
392        // Tip block number delta is < than min block interval
393        assert!(!pruner.is_pruning_needed(second_block_number));
394
395        // Tip block number delta is >= than min block interval
396        let third_block_number = second_block_number + pruner.min_block_interval as u64;
397        assert!(pruner.is_pruning_needed(third_block_number));
398
399        // Not all ExExs have emitted a `FinishedHeight` event yet
400        finished_exex_height_tx.send(FinishedExExHeight::NotReady).unwrap();
401        assert!(!pruner.is_pruning_needed(third_block_number));
402
403        // Adjust tip block number to the finished ExEx height that doesn't reach the threshold
404        finished_exex_height_tx.send(FinishedExExHeight::Height(second_block_number)).unwrap();
405        assert!(!pruner.is_pruning_needed(third_block_number));
406
407        // Adjust tip block number to the finished ExEx height that reaches the threshold
408        finished_exex_height_tx.send(FinishedExExHeight::Height(third_block_number)).unwrap();
409        assert!(pruner.is_pruning_needed(third_block_number));
410    }
411}