Skip to main content

reth_prune/
pruner.rs

1//! Support for pruning.
2
3use crate::{
4    segments::{PruneInput, Segment},
5    Metrics, PruneLimiter, PrunerError, PrunerEvent,
6};
7use alloy_primitives::BlockNumber;
8use reth_exex_types::FinishedExExHeight;
9use reth_primitives_traits::FastInstant as Instant;
10use reth_provider::{
11    DBProvider, DatabaseProviderFactory, PruneCheckpointReader, PruneCheckpointWriter,
12    StageCheckpointReader,
13};
14use reth_prune_types::{PruneProgress, PrunedSegmentInfo, PrunerOutput};
15use reth_stages_types::StageId;
16use reth_tokio_util::{EventSender, EventStream};
17use std::time::Duration;
18use tokio::sync::watch;
19use tracing::{debug, instrument};
20
21/// Result of [`Pruner::run`] execution.
22pub type PrunerResult = Result<PrunerOutput, PrunerError>;
23
24/// The pruner type itself with the result of [`Pruner::run`]
25pub type PrunerWithResult<S, DB> = (Pruner<S, DB>, PrunerResult);
26
27/// Pruner with preset provider factory.
28pub type PrunerWithFactory<PF> = Pruner<<PF as DatabaseProviderFactory>::ProviderRW, PF>;
29
30/// Pruning routine. Main pruning logic happens in [`Pruner::run`].
31#[derive(Debug)]
32pub struct Pruner<Provider, PF> {
33    /// Provider factory. If pruner is initialized without it, it will be set to `()`.
34    provider_factory: PF,
35    segments: Vec<Box<dyn Segment<Provider>>>,
36    /// Minimum pruning interval measured in blocks. All prune segments are checked and, if needed,
37    /// pruned, when the chain advances by the specified number of blocks.
38    min_block_interval: usize,
39    /// Previous tip block number when the pruner was run. Even if no data was pruned, this block
40    /// number is updated with the tip block number the pruner was called with. It's used in
41    /// conjunction with `min_block_interval` to determine when the pruning needs to be initiated.
42    previous_tip_block_number: Option<BlockNumber>,
43    /// Maximum total entries to prune (delete from database) per run.
44    delete_limit: usize,
45    /// Maximum time for one pruner run.
46    timeout: Option<Duration>,
47    /// The finished height of all `ExEx`'s.
48    finished_exex_height: watch::Receiver<FinishedExExHeight>,
49    #[doc(hidden)]
50    metrics: Metrics,
51    event_sender: EventSender<PrunerEvent>,
52}
53
54impl<Provider> Pruner<Provider, ()> {
55    /// Creates a new [Pruner] without a provider factory.
56    pub fn new(
57        segments: Vec<Box<dyn Segment<Provider>>>,
58        min_block_interval: usize,
59        delete_limit: usize,
60        timeout: Option<Duration>,
61        finished_exex_height: watch::Receiver<FinishedExExHeight>,
62    ) -> Self {
63        Self {
64            provider_factory: (),
65            segments,
66            min_block_interval,
67            previous_tip_block_number: None,
68            delete_limit,
69            timeout,
70            finished_exex_height,
71            metrics: Metrics::default(),
72            event_sender: Default::default(),
73        }
74    }
75}
76
77impl<PF> Pruner<PF::ProviderRW, PF>
78where
79    PF: DatabaseProviderFactory,
80{
81    /// Crates a new pruner with the given provider factory.
82    pub fn new_with_factory(
83        provider_factory: PF,
84        segments: Vec<Box<dyn Segment<PF::ProviderRW>>>,
85        min_block_interval: usize,
86        delete_limit: usize,
87        timeout: Option<Duration>,
88        finished_exex_height: watch::Receiver<FinishedExExHeight>,
89    ) -> Self {
90        Self {
91            provider_factory,
92            segments,
93            min_block_interval,
94            previous_tip_block_number: None,
95            delete_limit,
96            timeout,
97            finished_exex_height,
98            metrics: Metrics::default(),
99            event_sender: Default::default(),
100        }
101    }
102}
103
104impl<Provider, S> Pruner<Provider, S>
105where
106    Provider: PruneCheckpointReader + PruneCheckpointWriter + StageCheckpointReader,
107{
108    /// Listen for events on the pruner.
109    pub fn events(&self) -> EventStream<PrunerEvent> {
110        self.event_sender.new_listener()
111    }
112
113    /// Run the pruner with the given provider. This will only prune data up to the highest finished
114    /// `ExEx` height, if there are no `ExExes`.
115    ///
116    /// Returns a [`PruneProgress`], indicating whether pruning is finished, or there is more data
117    /// to prune.
118    #[instrument(
119        name = "Pruner::run_with_provider",
120        level = "debug",
121        target = "pruner",
122        skip(self, provider)
123    )]
124    pub fn run_with_provider(
125        &mut self,
126        provider: &Provider,
127        tip_block_number: BlockNumber,
128    ) -> PrunerResult {
129        let Some(tip_block_number) =
130            self.adjust_tip_block_number_to_finished_exex_height(tip_block_number)
131        else {
132            return Ok(PruneProgress::Finished.into())
133        };
134        if tip_block_number == 0 {
135            self.previous_tip_block_number = Some(tip_block_number);
136
137            debug!(target: "pruner", %tip_block_number, "Nothing to prune yet");
138            return Ok(PruneProgress::Finished.into())
139        }
140
141        self.event_sender.notify(PrunerEvent::Started { tip_block_number });
142
143        debug!(target: "pruner", %tip_block_number, "Pruner started");
144        let start = Instant::now();
145
146        let mut limiter = PruneLimiter::default().set_deleted_entries_limit(self.delete_limit);
147        if let Some(timeout) = self.timeout {
148            limiter = limiter.set_time_limit(timeout);
149        };
150
151        let (stats, deleted_entries, output) =
152            self.prune_segments(provider, tip_block_number, &mut limiter)?;
153
154        self.previous_tip_block_number = Some(tip_block_number);
155
156        let elapsed = start.elapsed();
157        self.metrics.duration_seconds.record(elapsed);
158
159        output.debug_log(tip_block_number, deleted_entries, elapsed);
160
161        self.event_sender.notify(PrunerEvent::Finished { tip_block_number, elapsed, stats });
162
163        Ok(output)
164    }
165
166    /// Prunes the segments that the [Pruner] was initialized with, and the segments that needs to
167    /// be pruned according to the highest `static_files`. Segments are parts of the database that
168    /// represent one or more tables.
169    ///
170    /// Returns a list of stats per pruned segment, total number of entries pruned, and
171    /// [`PruneProgress`].
172    #[instrument(level = "debug", target = "pruner", skip_all, fields(segments = self.segments.len()))]
173    fn prune_segments(
174        &mut self,
175        provider: &Provider,
176        tip_block_number: BlockNumber,
177        limiter: &mut PruneLimiter,
178    ) -> Result<(Vec<PrunedSegmentInfo>, usize, PrunerOutput), PrunerError> {
179        let mut stats = Vec::with_capacity(self.segments.len());
180        let mut pruned = 0;
181        let mut output = PrunerOutput {
182            progress: PruneProgress::Finished,
183            segments: Vec::with_capacity(self.segments.len()),
184        };
185
186        for segment in &self.segments {
187            if limiter.is_limit_reached() {
188                output.progress =
189                    output.progress.combine(PruneProgress::HasMoreData(limiter.interrupt_reason()));
190                break
191            }
192
193            if let Some((to_block, prune_mode)) = segment
194                .mode()
195                .map(|mode| {
196                    mode.prune_target_block(tip_block_number, segment.segment(), segment.purpose())
197                })
198                .transpose()?
199                .flatten()
200            {
201                // Check if segment has a required stage that must be finished first
202                if let Some(required_stage) = segment.required_stage() &&
203                    !is_stage_finished(provider, required_stage)?
204                {
205                    debug!(
206                        target: "pruner",
207                        segment = ?segment.segment(),
208                        ?required_stage,
209                        "Segment's required stage not finished, skipping"
210                    );
211                    continue
212                }
213
214                debug!(
215                    target: "pruner",
216                    segment = ?segment.segment(),
217                    purpose = ?segment.purpose(),
218                    %to_block,
219                    ?prune_mode,
220                    "Segment pruning started"
221                );
222
223                let segment_start = Instant::now();
224                let previous_checkpoint = provider.get_prune_checkpoint(segment.segment())?;
225                let segment_output = segment.prune(
226                    provider,
227                    PruneInput { previous_checkpoint, to_block, limiter: limiter.clone() },
228                )?;
229                if let Some(checkpoint) = segment_output.checkpoint {
230                    segment
231                        .save_checkpoint(provider, checkpoint.as_prune_checkpoint(prune_mode))?;
232                }
233                self.metrics
234                    .get_prune_segment_metrics(segment.segment())
235                    .duration_seconds
236                    .record(segment_start.elapsed());
237                if let Some(highest_pruned_block) =
238                    segment_output.checkpoint.and_then(|checkpoint| checkpoint.block_number)
239                {
240                    self.metrics
241                        .get_prune_segment_metrics(segment.segment())
242                        .highest_pruned_block
243                        .set(highest_pruned_block as f64);
244                }
245
246                output.progress = output.progress.combine(segment_output.progress);
247                output.segments.push((segment.segment(), segment_output));
248
249                debug!(
250                    target: "pruner",
251                    segment = ?segment.segment(),
252                    purpose = ?segment.purpose(),
253                    %to_block,
254                    ?prune_mode,
255                    %segment_output.pruned,
256                    "Segment pruning finished"
257                );
258
259                if segment_output.pruned > 0 {
260                    limiter.increment_deleted_entries_count_by(segment_output.pruned);
261                    pruned += segment_output.pruned;
262                    let info = PrunedSegmentInfo {
263                        segment: segment.segment(),
264                        pruned: segment_output.pruned,
265                        progress: segment_output.progress,
266                    };
267                    stats.push(info);
268                }
269            } else {
270                debug!(target: "pruner", segment = ?segment.segment(), purpose = ?segment.purpose(), "Nothing to prune for the segment");
271            }
272        }
273
274        Ok((stats, pruned, output))
275    }
276
277    /// Returns `true` if the pruning is needed at the provided tip block number.
278    /// This is determined by the check against minimum pruning interval and last pruned block
279    /// number.
280    pub fn is_pruning_needed(&self, tip_block_number: BlockNumber) -> bool {
281        let Some(tip_block_number) =
282            self.adjust_tip_block_number_to_finished_exex_height(tip_block_number)
283        else {
284            return false
285        };
286
287        // Saturating subtraction is needed for the case when the chain was reverted, meaning
288        // current block number might be less than the previous tip block number.
289        // If that's the case, no pruning is needed as outdated data is also reverted.
290        if tip_block_number.saturating_sub(self.previous_tip_block_number.unwrap_or_default()) >=
291            self.min_block_interval as u64
292        {
293            debug!(
294                target: "pruner",
295                previous_tip_block_number = ?self.previous_tip_block_number,
296                %tip_block_number,
297                "Minimum pruning interval reached"
298            );
299            true
300        } else {
301            false
302        }
303    }
304
305    /// Adjusts the tip block number to the finished `ExEx` height. This is needed to not prune more
306    /// data than `ExExs` have processed. Depending on the height:
307    /// - [`FinishedExExHeight::NoExExs`] returns the tip block number as no adjustment for `ExExs`
308    ///   is needed.
309    /// - [`FinishedExExHeight::NotReady`] returns `None` as not all `ExExs` have emitted a
310    ///   `FinishedHeight` event yet.
311    /// - [`FinishedExExHeight::Height`] returns the finished `ExEx` height.
312    fn adjust_tip_block_number_to_finished_exex_height(
313        &self,
314        tip_block_number: BlockNumber,
315    ) -> Option<BlockNumber> {
316        match *self.finished_exex_height.borrow() {
317            FinishedExExHeight::NoExExs => Some(tip_block_number),
318            FinishedExExHeight::NotReady => {
319                debug!(target: "pruner", %tip_block_number, "Not all ExExs have emitted a `FinishedHeight` event yet, can't prune");
320                None
321            }
322            FinishedExExHeight::Height(finished_exex_height) => {
323                debug!(target: "pruner", %tip_block_number, %finished_exex_height, "Adjusting tip block number to the finished ExEx height");
324                Some(finished_exex_height)
325            }
326        }
327    }
328}
329
330impl<PF> Pruner<PF::ProviderRW, PF>
331where
332    PF: DatabaseProviderFactory<
333        ProviderRW: PruneCheckpointWriter + PruneCheckpointReader + StageCheckpointReader,
334    >,
335{
336    /// Run the pruner. This will only prune data up to the highest finished ExEx height, if there
337    /// are no ExExes.
338    ///
339    /// Returns a [`PruneProgress`], indicating whether pruning is finished, or there is more data
340    /// to prune.
341    #[instrument(name = "Pruner::run", level = "debug", target = "pruner", skip(self))]
342    pub fn run(&mut self, tip_block_number: BlockNumber) -> PrunerResult {
343        let provider = self.provider_factory.database_provider_rw()?;
344        let result = self.run_with_provider(&provider, tip_block_number);
345        provider.commit()?;
346        result
347    }
348}
349
350/// Checks if the given stage has caught up with the `Finish` stage.
351///
352/// Returns `true` if the stage checkpoint is >= the Finish stage checkpoint.
353fn is_stage_finished<Provider: StageCheckpointReader>(
354    provider: &Provider,
355    stage_id: StageId,
356) -> Result<bool, PrunerError> {
357    let stage_checkpoint = provider.get_stage_checkpoint(stage_id)?.map(|c| c.block_number);
358    let finish_checkpoint = provider.get_stage_checkpoint(StageId::Finish)?.map(|c| c.block_number);
359
360    Ok(stage_checkpoint >= finish_checkpoint)
361}
362
363#[cfg(test)]
364mod tests {
365    use crate::Pruner;
366    use reth_exex_types::FinishedExExHeight;
367    use reth_provider::test_utils::create_test_provider_factory;
368
369    #[test]
370    fn is_pruning_needed() {
371        let provider_factory = create_test_provider_factory();
372
373        let (finished_exex_height_tx, finished_exex_height_rx) =
374            tokio::sync::watch::channel(FinishedExExHeight::NoExExs);
375
376        let mut pruner =
377            Pruner::new_with_factory(provider_factory, vec![], 5, 0, None, finished_exex_height_rx);
378
379        // No last pruned block number was set before
380        let first_block_number = 1;
381        assert!(!pruner.is_pruning_needed(first_block_number));
382        pruner.previous_tip_block_number = Some(first_block_number);
383
384        // Tip block number delta is >= than min block interval
385        let second_block_number = first_block_number + pruner.min_block_interval as u64;
386        assert!(pruner.is_pruning_needed(second_block_number));
387        pruner.previous_tip_block_number = Some(second_block_number);
388
389        // Tip block number delta is < than min block interval
390        assert!(!pruner.is_pruning_needed(second_block_number));
391
392        // Tip block number delta is >= than min block interval
393        let third_block_number = second_block_number + pruner.min_block_interval as u64;
394        assert!(pruner.is_pruning_needed(third_block_number));
395
396        // Not all ExExs have emitted a `FinishedHeight` event yet
397        finished_exex_height_tx.send(FinishedExExHeight::NotReady).unwrap();
398        assert!(!pruner.is_pruning_needed(third_block_number));
399
400        // Adjust tip block number to the finished ExEx height that doesn't reach the threshold
401        finished_exex_height_tx.send(FinishedExExHeight::Height(second_block_number)).unwrap();
402        assert!(!pruner.is_pruning_needed(third_block_number));
403
404        // Adjust tip block number to the finished ExEx height that reaches the threshold
405        finished_exex_height_tx.send(FinishedExExHeight::Height(third_block_number)).unwrap();
406        assert!(pruner.is_pruning_needed(third_block_number));
407    }
408}