Skip to main content

reth_prune/
pruner.rs

1//! Support for pruning.
2
3use crate::{
4    segments::{PruneInput, Segment},
5    Metrics, PruneLimiter, PrunerError, PrunerEvent,
6};
7use alloy_primitives::BlockNumber;
8use reth_exex_types::FinishedExExHeight;
9use reth_provider::{
10    DBProvider, DatabaseProviderFactory, PruneCheckpointReader, PruneCheckpointWriter,
11    StageCheckpointReader,
12};
13use reth_prune_types::{PruneProgress, PrunedSegmentInfo, PrunerOutput};
14use reth_stages_types::StageId;
15use reth_tokio_util::{EventSender, EventStream};
16use std::time::{Duration, Instant};
17use tokio::sync::watch;
18use tracing::{debug, instrument};
19
20/// Result of [`Pruner::run`] execution.
21pub type PrunerResult = Result<PrunerOutput, PrunerError>;
22
23/// The pruner type itself with the result of [`Pruner::run`]
24pub type PrunerWithResult<S, DB> = (Pruner<S, DB>, PrunerResult);
25
26/// Pruner with preset provider factory.
27pub type PrunerWithFactory<PF> = Pruner<<PF as DatabaseProviderFactory>::ProviderRW, PF>;
28
29/// Pruning routine. Main pruning logic happens in [`Pruner::run`].
30#[derive(Debug)]
31pub struct Pruner<Provider, PF> {
32    /// Provider factory. If pruner is initialized without it, it will be set to `()`.
33    provider_factory: PF,
34    segments: Vec<Box<dyn Segment<Provider>>>,
35    /// Minimum pruning interval measured in blocks. All prune segments are checked and, if needed,
36    /// pruned, when the chain advances by the specified number of blocks.
37    min_block_interval: usize,
38    /// Previous tip block number when the pruner was run. Even if no data was pruned, this block
39    /// number is updated with the tip block number the pruner was called with. It's used in
40    /// conjunction with `min_block_interval` to determine when the pruning needs to be initiated.
41    previous_tip_block_number: Option<BlockNumber>,
42    /// Maximum total entries to prune (delete from database) per run.
43    delete_limit: usize,
44    /// Maximum time for one pruner run.
45    timeout: Option<Duration>,
46    /// The finished height of all `ExEx`'s.
47    finished_exex_height: watch::Receiver<FinishedExExHeight>,
48    #[doc(hidden)]
49    metrics: Metrics,
50    event_sender: EventSender<PrunerEvent>,
51}
52
53impl<Provider> Pruner<Provider, ()> {
54    /// Creates a new [Pruner] without a provider factory.
55    pub fn new(
56        segments: Vec<Box<dyn Segment<Provider>>>,
57        min_block_interval: usize,
58        delete_limit: usize,
59        timeout: Option<Duration>,
60        finished_exex_height: watch::Receiver<FinishedExExHeight>,
61    ) -> Self {
62        Self {
63            provider_factory: (),
64            segments,
65            min_block_interval,
66            previous_tip_block_number: None,
67            delete_limit,
68            timeout,
69            finished_exex_height,
70            metrics: Metrics::default(),
71            event_sender: Default::default(),
72        }
73    }
74}
75
76impl<PF> Pruner<PF::ProviderRW, PF>
77where
78    PF: DatabaseProviderFactory,
79{
80    /// Crates a new pruner with the given provider factory.
81    pub fn new_with_factory(
82        provider_factory: PF,
83        segments: Vec<Box<dyn Segment<PF::ProviderRW>>>,
84        min_block_interval: usize,
85        delete_limit: usize,
86        timeout: Option<Duration>,
87        finished_exex_height: watch::Receiver<FinishedExExHeight>,
88    ) -> Self {
89        Self {
90            provider_factory,
91            segments,
92            min_block_interval,
93            previous_tip_block_number: None,
94            delete_limit,
95            timeout,
96            finished_exex_height,
97            metrics: Metrics::default(),
98            event_sender: Default::default(),
99        }
100    }
101}
102
103impl<Provider, S> Pruner<Provider, S>
104where
105    Provider: PruneCheckpointReader + PruneCheckpointWriter + StageCheckpointReader,
106{
107    /// Listen for events on the pruner.
108    pub fn events(&self) -> EventStream<PrunerEvent> {
109        self.event_sender.new_listener()
110    }
111
112    /// Run the pruner with the given provider. This will only prune data up to the highest finished
113    /// `ExEx` height, if there are no `ExExes`.
114    ///
115    /// Returns a [`PruneProgress`], indicating whether pruning is finished, or there is more data
116    /// to prune.
117    #[instrument(
118        name = "Pruner::run_with_provider",
119        level = "debug",
120        target = "pruner",
121        skip(self, provider)
122    )]
123    pub fn run_with_provider(
124        &mut self,
125        provider: &Provider,
126        tip_block_number: BlockNumber,
127    ) -> PrunerResult {
128        let Some(tip_block_number) =
129            self.adjust_tip_block_number_to_finished_exex_height(tip_block_number)
130        else {
131            return Ok(PruneProgress::Finished.into())
132        };
133        if tip_block_number == 0 {
134            self.previous_tip_block_number = Some(tip_block_number);
135
136            debug!(target: "pruner", %tip_block_number, "Nothing to prune yet");
137            return Ok(PruneProgress::Finished.into())
138        }
139
140        self.event_sender.notify(PrunerEvent::Started { tip_block_number });
141
142        debug!(target: "pruner", %tip_block_number, "Pruner started");
143        let start = Instant::now();
144
145        let mut limiter = PruneLimiter::default().set_deleted_entries_limit(self.delete_limit);
146        if let Some(timeout) = self.timeout {
147            limiter = limiter.set_time_limit(timeout);
148        };
149
150        let (stats, deleted_entries, output) =
151            self.prune_segments(provider, tip_block_number, &mut limiter)?;
152
153        self.previous_tip_block_number = Some(tip_block_number);
154
155        let elapsed = start.elapsed();
156        self.metrics.duration_seconds.record(elapsed);
157
158        output.debug_log(tip_block_number, deleted_entries, elapsed);
159
160        self.event_sender.notify(PrunerEvent::Finished { tip_block_number, elapsed, stats });
161
162        Ok(output)
163    }
164
165    /// Prunes the segments that the [Pruner] was initialized with, and the segments that needs to
166    /// be pruned according to the highest `static_files`. Segments are parts of the database that
167    /// represent one or more tables.
168    ///
169    /// Returns a list of stats per pruned segment, total number of entries pruned, and
170    /// [`PruneProgress`].
171    #[instrument(level = "debug", target = "pruner", skip_all, fields(segments = self.segments.len()))]
172    fn prune_segments(
173        &mut self,
174        provider: &Provider,
175        tip_block_number: BlockNumber,
176        limiter: &mut PruneLimiter,
177    ) -> Result<(Vec<PrunedSegmentInfo>, usize, PrunerOutput), PrunerError> {
178        let mut stats = Vec::with_capacity(self.segments.len());
179        let mut pruned = 0;
180        let mut output = PrunerOutput {
181            progress: PruneProgress::Finished,
182            segments: Vec::with_capacity(self.segments.len()),
183        };
184
185        for segment in &self.segments {
186            if limiter.is_limit_reached() {
187                output.progress =
188                    output.progress.combine(PruneProgress::HasMoreData(limiter.interrupt_reason()));
189                break
190            }
191
192            if let Some((to_block, prune_mode)) = segment
193                .mode()
194                .map(|mode| {
195                    mode.prune_target_block(tip_block_number, segment.segment(), segment.purpose())
196                })
197                .transpose()?
198                .flatten()
199            {
200                // Check if segment has a required stage that must be finished first
201                if let Some(required_stage) = segment.required_stage() &&
202                    !is_stage_finished(provider, required_stage)?
203                {
204                    debug!(
205                        target: "pruner",
206                        segment = ?segment.segment(),
207                        ?required_stage,
208                        "Segment's required stage not finished, skipping"
209                    );
210                    continue
211                }
212
213                debug!(
214                    target: "pruner",
215                    segment = ?segment.segment(),
216                    purpose = ?segment.purpose(),
217                    %to_block,
218                    ?prune_mode,
219                    "Segment pruning started"
220                );
221
222                let segment_start = Instant::now();
223                let previous_checkpoint = provider.get_prune_checkpoint(segment.segment())?;
224                let segment_output = segment.prune(
225                    provider,
226                    PruneInput { previous_checkpoint, to_block, limiter: limiter.clone() },
227                )?;
228                if let Some(checkpoint) = segment_output.checkpoint {
229                    segment
230                        .save_checkpoint(provider, checkpoint.as_prune_checkpoint(prune_mode))?;
231                }
232                self.metrics
233                    .get_prune_segment_metrics(segment.segment())
234                    .duration_seconds
235                    .record(segment_start.elapsed());
236                if let Some(highest_pruned_block) =
237                    segment_output.checkpoint.and_then(|checkpoint| checkpoint.block_number)
238                {
239                    self.metrics
240                        .get_prune_segment_metrics(segment.segment())
241                        .highest_pruned_block
242                        .set(highest_pruned_block as f64);
243                }
244
245                output.progress = output.progress.combine(segment_output.progress);
246                output.segments.push((segment.segment(), segment_output));
247
248                debug!(
249                    target: "pruner",
250                    segment = ?segment.segment(),
251                    purpose = ?segment.purpose(),
252                    %to_block,
253                    ?prune_mode,
254                    %segment_output.pruned,
255                    "Segment pruning finished"
256                );
257
258                if segment_output.pruned > 0 {
259                    limiter.increment_deleted_entries_count_by(segment_output.pruned);
260                    pruned += segment_output.pruned;
261                    let info = PrunedSegmentInfo {
262                        segment: segment.segment(),
263                        pruned: segment_output.pruned,
264                        progress: segment_output.progress,
265                    };
266                    stats.push(info);
267                }
268            } else {
269                debug!(target: "pruner", segment = ?segment.segment(), purpose = ?segment.purpose(), "Nothing to prune for the segment");
270            }
271        }
272
273        Ok((stats, pruned, output))
274    }
275
276    /// Returns `true` if the pruning is needed at the provided tip block number.
277    /// This is determined by the check against minimum pruning interval and last pruned block
278    /// number.
279    pub fn is_pruning_needed(&self, tip_block_number: BlockNumber) -> bool {
280        let Some(tip_block_number) =
281            self.adjust_tip_block_number_to_finished_exex_height(tip_block_number)
282        else {
283            return false
284        };
285
286        // Saturating subtraction is needed for the case when the chain was reverted, meaning
287        // current block number might be less than the previous tip block number.
288        // If that's the case, no pruning is needed as outdated data is also reverted.
289        if tip_block_number.saturating_sub(self.previous_tip_block_number.unwrap_or_default()) >=
290            self.min_block_interval as u64
291        {
292            debug!(
293                target: "pruner",
294                previous_tip_block_number = ?self.previous_tip_block_number,
295                %tip_block_number,
296                "Minimum pruning interval reached"
297            );
298            true
299        } else {
300            false
301        }
302    }
303
304    /// Adjusts the tip block number to the finished `ExEx` height. This is needed to not prune more
305    /// data than `ExExs` have processed. Depending on the height:
306    /// - [`FinishedExExHeight::NoExExs`] returns the tip block number as no adjustment for `ExExs`
307    ///   is needed.
308    /// - [`FinishedExExHeight::NotReady`] returns `None` as not all `ExExs` have emitted a
309    ///   `FinishedHeight` event yet.
310    /// - [`FinishedExExHeight::Height`] returns the finished `ExEx` height.
311    fn adjust_tip_block_number_to_finished_exex_height(
312        &self,
313        tip_block_number: BlockNumber,
314    ) -> Option<BlockNumber> {
315        match *self.finished_exex_height.borrow() {
316            FinishedExExHeight::NoExExs => Some(tip_block_number),
317            FinishedExExHeight::NotReady => {
318                debug!(target: "pruner", %tip_block_number, "Not all ExExs have emitted a `FinishedHeight` event yet, can't prune");
319                None
320            }
321            FinishedExExHeight::Height(finished_exex_height) => {
322                debug!(target: "pruner", %tip_block_number, %finished_exex_height, "Adjusting tip block number to the finished ExEx height");
323                Some(finished_exex_height)
324            }
325        }
326    }
327}
328
329impl<PF> Pruner<PF::ProviderRW, PF>
330where
331    PF: DatabaseProviderFactory<
332        ProviderRW: PruneCheckpointWriter + PruneCheckpointReader + StageCheckpointReader,
333    >,
334{
335    /// Run the pruner. This will only prune data up to the highest finished ExEx height, if there
336    /// are no ExExes.
337    ///
338    /// Returns a [`PruneProgress`], indicating whether pruning is finished, or there is more data
339    /// to prune.
340    #[instrument(name = "Pruner::run", level = "debug", target = "pruner", skip(self))]
341    pub fn run(&mut self, tip_block_number: BlockNumber) -> PrunerResult {
342        let provider = self.provider_factory.database_provider_rw()?;
343        let result = self.run_with_provider(&provider, tip_block_number);
344        provider.commit()?;
345        result
346    }
347}
348
349/// Checks if the given stage has caught up with the `Finish` stage.
350///
351/// Returns `true` if the stage checkpoint is >= the Finish stage checkpoint.
352fn is_stage_finished<Provider: StageCheckpointReader>(
353    provider: &Provider,
354    stage_id: StageId,
355) -> Result<bool, PrunerError> {
356    let stage_checkpoint = provider.get_stage_checkpoint(stage_id)?.map(|c| c.block_number);
357    let finish_checkpoint = provider.get_stage_checkpoint(StageId::Finish)?.map(|c| c.block_number);
358
359    Ok(stage_checkpoint >= finish_checkpoint)
360}
361
362#[cfg(test)]
363mod tests {
364    use crate::Pruner;
365    use reth_exex_types::FinishedExExHeight;
366    use reth_provider::test_utils::create_test_provider_factory;
367
368    #[test]
369    fn is_pruning_needed() {
370        let provider_factory = create_test_provider_factory();
371
372        let (finished_exex_height_tx, finished_exex_height_rx) =
373            tokio::sync::watch::channel(FinishedExExHeight::NoExExs);
374
375        let mut pruner =
376            Pruner::new_with_factory(provider_factory, vec![], 5, 0, None, finished_exex_height_rx);
377
378        // No last pruned block number was set before
379        let first_block_number = 1;
380        assert!(!pruner.is_pruning_needed(first_block_number));
381        pruner.previous_tip_block_number = Some(first_block_number);
382
383        // Tip block number delta is >= than min block interval
384        let second_block_number = first_block_number + pruner.min_block_interval as u64;
385        assert!(pruner.is_pruning_needed(second_block_number));
386        pruner.previous_tip_block_number = Some(second_block_number);
387
388        // Tip block number delta is < than min block interval
389        assert!(!pruner.is_pruning_needed(second_block_number));
390
391        // Tip block number delta is >= than min block interval
392        let third_block_number = second_block_number + pruner.min_block_interval as u64;
393        assert!(pruner.is_pruning_needed(third_block_number));
394
395        // Not all ExExs have emitted a `FinishedHeight` event yet
396        finished_exex_height_tx.send(FinishedExExHeight::NotReady).unwrap();
397        assert!(!pruner.is_pruning_needed(third_block_number));
398
399        // Adjust tip block number to the finished ExEx height that doesn't reach the threshold
400        finished_exex_height_tx.send(FinishedExExHeight::Height(second_block_number)).unwrap();
401        assert!(!pruner.is_pruning_needed(third_block_number));
402
403        // Adjust tip block number to the finished ExEx height that reaches the threshold
404        finished_exex_height_tx.send(FinishedExExHeight::Height(third_block_number)).unwrap();
405        assert!(pruner.is_pruning_needed(third_block_number));
406    }
407}