Skip to main content

reth_prune/
pruner.rs

1//! Support for pruning.
2
3use crate::{
4    segments::{PruneInput, Segment},
5    Metrics, PruneLimiter, PrunerError, PrunerEvent,
6};
7use alloy_primitives::BlockNumber;
8use reth_exex_types::FinishedExExHeight;
9use reth_primitives_traits::FastInstant as Instant;
10use reth_provider::{
11    DBProvider, DatabaseProviderFactory, PruneCheckpointReader, PruneCheckpointWriter,
12    StageCheckpointReader,
13};
14use reth_prune_types::{PruneProgress, PrunedSegmentInfo, PrunerOutput};
15use reth_stages_types::StageId;
16use reth_tokio_util::{EventSender, EventStream};
17use std::time::Duration;
18use tokio::sync::watch;
19use tracing::{debug, instrument};
20
21/// Result of [`Pruner::run`] execution.
22pub type PrunerResult = Result<PrunerOutput, PrunerError>;
23
24/// The pruner type itself with the result of [`Pruner::run`]
25pub type PrunerWithResult<S, DB> = (Pruner<S, DB>, PrunerResult);
26
27/// Pruner with preset provider factory.
28pub type PrunerWithFactory<PF> = Pruner<<PF as DatabaseProviderFactory>::ProviderRW, PF>;
29
30/// Pruning routine. Main pruning logic happens in [`Pruner::run`].
31#[derive(Debug)]
32pub struct Pruner<Provider, PF> {
33    /// Provider factory. If pruner is initialized without it, it will be set to `()`.
34    provider_factory: PF,
35    segments: Vec<Box<dyn Segment<Provider>>>,
36    /// Minimum pruning interval measured in blocks. All prune segments are checked and, if needed,
37    /// pruned, when the chain advances by the specified number of blocks.
38    min_block_interval: usize,
39    /// Previous tip block number when the pruner was run. Even if no data was pruned, this block
40    /// number is updated with the tip block number the pruner was called with. It's used in
41    /// conjunction with `min_block_interval` to determine when the pruning needs to be initiated.
42    previous_tip_block_number: Option<BlockNumber>,
43    /// Maximum total entries to prune (delete from database) per run.
44    delete_limit: usize,
45    /// Maximum time for one pruner run.
46    timeout: Option<Duration>,
47    /// Optional override for the minimum pruning distance. When set, this replaces the
48    /// per-segment hardcoded minimums (e.g. `MINIMUM_UNWIND_SAFE_DISTANCE`).
49    minimum_pruning_distance: Option<u64>,
50    /// The finished height of all `ExEx`'s.
51    finished_exex_height: watch::Receiver<FinishedExExHeight>,
52    #[doc(hidden)]
53    metrics: Metrics,
54    event_sender: EventSender<PrunerEvent>,
55}
56
57impl<Provider> Pruner<Provider, ()> {
58    /// Creates a new [Pruner] without a provider factory.
59    pub fn new(
60        segments: Vec<Box<dyn Segment<Provider>>>,
61        min_block_interval: usize,
62        delete_limit: usize,
63        timeout: Option<Duration>,
64        finished_exex_height: watch::Receiver<FinishedExExHeight>,
65    ) -> Self {
66        Self {
67            provider_factory: (),
68            segments,
69            min_block_interval,
70            previous_tip_block_number: None,
71            delete_limit,
72            timeout,
73            minimum_pruning_distance: None,
74            finished_exex_height,
75            metrics: Metrics::default(),
76            event_sender: Default::default(),
77        }
78    }
79}
80
81impl<PF> Pruner<PF::ProviderRW, PF>
82where
83    PF: DatabaseProviderFactory,
84{
85    /// Creates a new pruner with the given provider factory.
86    pub fn new_with_factory(
87        provider_factory: PF,
88        segments: Vec<Box<dyn Segment<PF::ProviderRW>>>,
89        min_block_interval: usize,
90        delete_limit: usize,
91        timeout: Option<Duration>,
92        finished_exex_height: watch::Receiver<FinishedExExHeight>,
93    ) -> Self {
94        Self {
95            provider_factory,
96            segments,
97            min_block_interval,
98            previous_tip_block_number: None,
99            delete_limit,
100            timeout,
101            minimum_pruning_distance: None,
102            finished_exex_height,
103            metrics: Metrics::default(),
104            event_sender: Default::default(),
105        }
106    }
107}
108
109impl<Provider, S> Pruner<Provider, S> {
110    /// Sets the minimum pruning distance, overriding per-segment hardcoded minimums.
111    pub const fn with_minimum_pruning_distance(mut self, distance: u64) -> Self {
112        self.minimum_pruning_distance = Some(distance);
113        self
114    }
115}
116
117impl<Provider, S> Pruner<Provider, S>
118where
119    Provider: PruneCheckpointReader + PruneCheckpointWriter + StageCheckpointReader,
120{
121    /// Listen for events on the pruner.
122    pub fn events(&self) -> EventStream<PrunerEvent> {
123        self.event_sender.new_listener()
124    }
125
126    /// Run the pruner with the given provider. This will only prune data up to the highest finished
127    /// `ExEx` height, if there are no `ExExes`.
128    ///
129    /// Returns a [`PruneProgress`], indicating whether pruning is finished, or there is more data
130    /// to prune.
131    #[instrument(
132        name = "Pruner::run_with_provider",
133        level = "debug",
134        target = "pruner",
135        skip(self, provider)
136    )]
137    pub fn run_with_provider(
138        &mut self,
139        provider: &Provider,
140        tip_block_number: BlockNumber,
141    ) -> PrunerResult {
142        let Some(tip_block_number) =
143            self.adjust_tip_block_number_to_finished_exex_height(tip_block_number)
144        else {
145            return Ok(PruneProgress::Finished.into())
146        };
147        if tip_block_number == 0 {
148            self.previous_tip_block_number = Some(tip_block_number);
149
150            debug!(target: "pruner", %tip_block_number, "Nothing to prune yet");
151            return Ok(PruneProgress::Finished.into())
152        }
153
154        self.event_sender.notify(PrunerEvent::Started { tip_block_number });
155
156        debug!(target: "pruner", %tip_block_number, "Pruner started");
157        let start = Instant::now();
158
159        let mut limiter = PruneLimiter::default().set_deleted_entries_limit(self.delete_limit);
160        if let Some(timeout) = self.timeout {
161            limiter = limiter.set_time_limit(timeout);
162        };
163
164        let (stats, deleted_entries, output) =
165            self.prune_segments(provider, tip_block_number, &mut limiter)?;
166
167        self.previous_tip_block_number = Some(tip_block_number);
168
169        let elapsed = start.elapsed();
170        self.metrics.duration_seconds.record(elapsed);
171
172        output.debug_log(tip_block_number, deleted_entries, elapsed);
173
174        self.event_sender.notify(PrunerEvent::Finished { tip_block_number, elapsed, stats });
175
176        Ok(output)
177    }
178
179    /// Prunes the segments that the [Pruner] was initialized with, and the segments that needs to
180    /// be pruned according to the highest `static_files`. Segments are parts of the database that
181    /// represent one or more tables.
182    ///
183    /// Returns a list of stats per pruned segment, total number of entries pruned, and
184    /// [`PruneProgress`].
185    #[instrument(level = "debug", target = "pruner", skip_all, fields(segments = self.segments.len()))]
186    fn prune_segments(
187        &mut self,
188        provider: &Provider,
189        tip_block_number: BlockNumber,
190        limiter: &mut PruneLimiter,
191    ) -> Result<(Vec<PrunedSegmentInfo>, usize, PrunerOutput), PrunerError> {
192        let mut stats = Vec::with_capacity(self.segments.len());
193        let mut pruned = 0;
194        let mut output = PrunerOutput {
195            progress: PruneProgress::Finished,
196            segments: Vec::with_capacity(self.segments.len()),
197        };
198
199        for segment in &self.segments {
200            if limiter.is_limit_reached() {
201                output.progress =
202                    output.progress.combine(PruneProgress::HasMoreData(limiter.interrupt_reason()));
203                break
204            }
205
206            if let Some((to_block, prune_mode)) = segment
207                .mode()
208                .map(|mode| {
209                    mode.prune_target_block_with_min(
210                        tip_block_number,
211                        segment.segment(),
212                        segment.purpose(),
213                        self.minimum_pruning_distance,
214                    )
215                })
216                .transpose()?
217                .flatten()
218            {
219                // Check if segment has a required stage that must be finished first
220                if let Some(required_stage) = segment.required_stage() &&
221                    !is_stage_finished(provider, required_stage)?
222                {
223                    debug!(
224                        target: "pruner",
225                        segment = ?segment.segment(),
226                        ?required_stage,
227                        "Segment's required stage not finished, skipping"
228                    );
229                    continue
230                }
231
232                debug!(
233                    target: "pruner",
234                    segment = ?segment.segment(),
235                    purpose = ?segment.purpose(),
236                    %to_block,
237                    ?prune_mode,
238                    "Segment pruning started"
239                );
240
241                let segment_start = Instant::now();
242                let previous_checkpoint = provider.get_prune_checkpoint(segment.segment())?;
243                let segment_output = segment.prune(
244                    provider,
245                    PruneInput { previous_checkpoint, to_block, limiter: limiter.clone() },
246                )?;
247                if let Some(checkpoint) = segment_output.checkpoint {
248                    segment
249                        .save_checkpoint(provider, checkpoint.as_prune_checkpoint(prune_mode))?;
250                }
251                self.metrics
252                    .get_prune_segment_metrics(segment.segment())
253                    .duration_seconds
254                    .record(segment_start.elapsed());
255                if let Some(highest_pruned_block) =
256                    segment_output.checkpoint.and_then(|checkpoint| checkpoint.block_number)
257                {
258                    self.metrics
259                        .get_prune_segment_metrics(segment.segment())
260                        .highest_pruned_block
261                        .set(highest_pruned_block as f64);
262                }
263
264                output.progress = output.progress.combine(segment_output.progress);
265                output.segments.push((segment.segment(), segment_output));
266
267                debug!(
268                    target: "pruner",
269                    segment = ?segment.segment(),
270                    purpose = ?segment.purpose(),
271                    %to_block,
272                    ?prune_mode,
273                    %segment_output.pruned,
274                    "Segment pruning finished"
275                );
276
277                if segment_output.pruned > 0 {
278                    limiter.increment_deleted_entries_count_by(segment_output.pruned);
279                    pruned += segment_output.pruned;
280                    let info = PrunedSegmentInfo {
281                        segment: segment.segment(),
282                        pruned: segment_output.pruned,
283                        progress: segment_output.progress,
284                    };
285                    stats.push(info);
286                }
287            } else {
288                debug!(target: "pruner", segment = ?segment.segment(), purpose = ?segment.purpose(), "Nothing to prune for the segment");
289            }
290        }
291
292        Ok((stats, pruned, output))
293    }
294
295    /// Returns `true` if the pruning is needed at the provided tip block number.
296    /// This is determined by the check against minimum pruning interval and last pruned block
297    /// number.
298    pub fn is_pruning_needed(&self, tip_block_number: BlockNumber) -> bool {
299        let Some(tip_block_number) =
300            self.adjust_tip_block_number_to_finished_exex_height(tip_block_number)
301        else {
302            return false
303        };
304
305        // Saturating subtraction is needed for the case when the chain was reverted, meaning
306        // current block number might be less than the previous tip block number.
307        // If that's the case, no pruning is needed as outdated data is also reverted.
308        if tip_block_number.saturating_sub(self.previous_tip_block_number.unwrap_or_default()) >=
309            self.min_block_interval as u64
310        {
311            debug!(
312                target: "pruner",
313                previous_tip_block_number = ?self.previous_tip_block_number,
314                %tip_block_number,
315                "Minimum pruning interval reached"
316            );
317            true
318        } else {
319            false
320        }
321    }
322
323    /// Adjusts the tip block number to the finished `ExEx` height. This is needed to not prune more
324    /// data than `ExExs` have processed. Depending on the height:
325    /// - [`FinishedExExHeight::NoExExs`] returns the tip block number as no adjustment for `ExExs`
326    ///   is needed.
327    /// - [`FinishedExExHeight::NotReady`] returns `None` as not all `ExExs` have emitted a
328    ///   `FinishedHeight` event yet.
329    /// - [`FinishedExExHeight::Height`] returns the finished `ExEx` height.
330    fn adjust_tip_block_number_to_finished_exex_height(
331        &self,
332        tip_block_number: BlockNumber,
333    ) -> Option<BlockNumber> {
334        match *self.finished_exex_height.borrow() {
335            FinishedExExHeight::NoExExs => Some(tip_block_number),
336            FinishedExExHeight::NotReady => {
337                debug!(target: "pruner", %tip_block_number, "Not all ExExs have emitted a `FinishedHeight` event yet, can't prune");
338                None
339            }
340            FinishedExExHeight::Height(finished_exex_height) => {
341                debug!(target: "pruner", %tip_block_number, %finished_exex_height, "Adjusting tip block number to the finished ExEx height");
342                Some(finished_exex_height)
343            }
344        }
345    }
346}
347
348impl<PF> Pruner<PF::ProviderRW, PF>
349where
350    PF: DatabaseProviderFactory<
351        ProviderRW: PruneCheckpointWriter + PruneCheckpointReader + StageCheckpointReader,
352    >,
353{
354    /// Run the pruner. This will only prune data up to the highest finished ExEx height, if there
355    /// are no ExExes.
356    ///
357    /// Returns a [`PruneProgress`], indicating whether pruning is finished, or there is more data
358    /// to prune.
359    #[instrument(name = "Pruner::run", level = "debug", target = "pruner", skip(self))]
360    pub fn run(&mut self, tip_block_number: BlockNumber) -> PrunerResult {
361        let provider = self.provider_factory.database_provider_rw()?;
362        let result = self.run_with_provider(&provider, tip_block_number);
363        provider.commit()?;
364        result
365    }
366}
367
368/// Checks if the given stage has caught up with the `Finish` stage.
369///
370/// Returns `true` if the stage checkpoint is >= the Finish stage checkpoint.
371fn is_stage_finished<Provider: StageCheckpointReader>(
372    provider: &Provider,
373    stage_id: StageId,
374) -> Result<bool, PrunerError> {
375    let stage_checkpoint = provider.get_stage_checkpoint(stage_id)?.map(|c| c.block_number);
376    let finish_checkpoint = provider.get_stage_checkpoint(StageId::Finish)?.map(|c| c.block_number);
377
378    Ok(stage_checkpoint >= finish_checkpoint)
379}
380
381#[cfg(test)]
382mod tests {
383    use crate::Pruner;
384    use reth_exex_types::FinishedExExHeight;
385    use reth_provider::test_utils::create_test_provider_factory;
386
387    #[test]
388    fn is_pruning_needed() {
389        let provider_factory = create_test_provider_factory();
390
391        let (finished_exex_height_tx, finished_exex_height_rx) =
392            tokio::sync::watch::channel(FinishedExExHeight::NoExExs);
393
394        let mut pruner =
395            Pruner::new_with_factory(provider_factory, vec![], 5, 0, None, finished_exex_height_rx);
396
397        // No last pruned block number was set before
398        let first_block_number = 1;
399        assert!(!pruner.is_pruning_needed(first_block_number));
400        pruner.previous_tip_block_number = Some(first_block_number);
401
402        // Tip block number delta is >= than min block interval
403        let second_block_number = first_block_number + pruner.min_block_interval as u64;
404        assert!(pruner.is_pruning_needed(second_block_number));
405        pruner.previous_tip_block_number = Some(second_block_number);
406
407        // Tip block number delta is < than min block interval
408        assert!(!pruner.is_pruning_needed(second_block_number));
409
410        // Tip block number delta is >= than min block interval
411        let third_block_number = second_block_number + pruner.min_block_interval as u64;
412        assert!(pruner.is_pruning_needed(third_block_number));
413
414        // Not all ExExs have emitted a `FinishedHeight` event yet
415        finished_exex_height_tx.send(FinishedExExHeight::NotReady).unwrap();
416        assert!(!pruner.is_pruning_needed(third_block_number));
417
418        // Adjust tip block number to the finished ExEx height that doesn't reach the threshold
419        finished_exex_height_tx.send(FinishedExExHeight::Height(second_block_number)).unwrap();
420        assert!(!pruner.is_pruning_needed(third_block_number));
421
422        // Adjust tip block number to the finished ExEx height that reaches the threshold
423        finished_exex_height_tx.send(FinishedExExHeight::Height(third_block_number)).unwrap();
424        assert!(pruner.is_pruning_needed(third_block_number));
425    }
426}