reth_prune/
pruner.rs

1//! Support for pruning.
2
3use crate::{
4    segments::{PruneInput, Segment},
5    Metrics, PruneLimiter, PrunerError, PrunerEvent,
6};
7use alloy_primitives::BlockNumber;
8use reth_exex_types::FinishedExExHeight;
9use reth_provider::{
10    DBProvider, DatabaseProviderFactory, PruneCheckpointReader, PruneCheckpointWriter,
11};
12use reth_prune_types::{PruneProgress, PrunedSegmentInfo, PrunerOutput};
13use reth_tokio_util::{EventSender, EventStream};
14use std::time::{Duration, Instant};
15use tokio::sync::watch;
16use tracing::debug;
17
18/// Result of [`Pruner::run`] execution.
19pub type PrunerResult = Result<PrunerOutput, PrunerError>;
20
21/// The pruner type itself with the result of [`Pruner::run`]
22pub type PrunerWithResult<S, DB> = (Pruner<S, DB>, PrunerResult);
23
24/// Pruner with preset provider factory.
25pub type PrunerWithFactory<PF> = Pruner<<PF as DatabaseProviderFactory>::ProviderRW, PF>;
26
27/// Pruning routine. Main pruning logic happens in [`Pruner::run`].
28#[derive(Debug)]
29pub struct Pruner<Provider, PF> {
30    /// Provider factory. If pruner is initialized without it, it will be set to `()`.
31    provider_factory: PF,
32    segments: Vec<Box<dyn Segment<Provider>>>,
33    /// Minimum pruning interval measured in blocks. All prune segments are checked and, if needed,
34    /// pruned, when the chain advances by the specified number of blocks.
35    min_block_interval: usize,
36    /// Previous tip block number when the pruner was run. Even if no data was pruned, this block
37    /// number is updated with the tip block number the pruner was called with. It's used in
38    /// conjunction with `min_block_interval` to determine when the pruning needs to be initiated.
39    previous_tip_block_number: Option<BlockNumber>,
40    /// Maximum total entries to prune (delete from database) per run.
41    delete_limit: usize,
42    /// Maximum time for a one pruner run.
43    timeout: Option<Duration>,
44    /// The finished height of all `ExEx`'s.
45    finished_exex_height: watch::Receiver<FinishedExExHeight>,
46    #[doc(hidden)]
47    metrics: Metrics,
48    event_sender: EventSender<PrunerEvent>,
49}
50
51impl<Provider> Pruner<Provider, ()> {
52    /// Creates a new [Pruner] without a provider factory.
53    pub fn new(
54        segments: Vec<Box<dyn Segment<Provider>>>,
55        min_block_interval: usize,
56        delete_limit: usize,
57        timeout: Option<Duration>,
58        finished_exex_height: watch::Receiver<FinishedExExHeight>,
59    ) -> Self {
60        Self {
61            provider_factory: (),
62            segments,
63            min_block_interval,
64            previous_tip_block_number: None,
65            delete_limit,
66            timeout,
67            finished_exex_height,
68            metrics: Metrics::default(),
69            event_sender: Default::default(),
70        }
71    }
72}
73
74impl<PF> Pruner<PF::ProviderRW, PF>
75where
76    PF: DatabaseProviderFactory,
77{
78    /// Crates a new pruner with the given provider factory.
79    pub fn new_with_factory(
80        provider_factory: PF,
81        segments: Vec<Box<dyn Segment<PF::ProviderRW>>>,
82        min_block_interval: usize,
83        delete_limit: usize,
84        timeout: Option<Duration>,
85        finished_exex_height: watch::Receiver<FinishedExExHeight>,
86    ) -> Self {
87        Self {
88            provider_factory,
89            segments,
90            min_block_interval,
91            previous_tip_block_number: None,
92            delete_limit,
93            timeout,
94            finished_exex_height,
95            metrics: Metrics::default(),
96            event_sender: Default::default(),
97        }
98    }
99}
100
101impl<Provider, S> Pruner<Provider, S>
102where
103    Provider: PruneCheckpointReader + PruneCheckpointWriter,
104{
105    /// Listen for events on the pruner.
106    pub fn events(&self) -> EventStream<PrunerEvent> {
107        self.event_sender.new_listener()
108    }
109
110    /// Run the pruner with the given provider. This will only prune data up to the highest finished
111    /// `ExEx` height, if there are no `ExExes`.
112    ///
113    /// Returns a [`PruneProgress`], indicating whether pruning is finished, or there is more data
114    /// to prune.
115    pub fn run_with_provider(
116        &mut self,
117        provider: &Provider,
118        tip_block_number: BlockNumber,
119    ) -> PrunerResult {
120        let Some(tip_block_number) =
121            self.adjust_tip_block_number_to_finished_exex_height(tip_block_number)
122        else {
123            return Ok(PruneProgress::Finished.into())
124        };
125        if tip_block_number == 0 {
126            self.previous_tip_block_number = Some(tip_block_number);
127
128            debug!(target: "pruner", %tip_block_number, "Nothing to prune yet");
129            return Ok(PruneProgress::Finished.into())
130        }
131
132        self.event_sender.notify(PrunerEvent::Started { tip_block_number });
133
134        debug!(target: "pruner", %tip_block_number, "Pruner started");
135        let start = Instant::now();
136
137        let mut limiter = PruneLimiter::default().set_deleted_entries_limit(self.delete_limit);
138        if let Some(timeout) = self.timeout {
139            limiter = limiter.set_time_limit(timeout);
140        };
141
142        let (stats, deleted_entries, output) =
143            self.prune_segments(provider, tip_block_number, &mut limiter)?;
144
145        self.previous_tip_block_number = Some(tip_block_number);
146
147        let elapsed = start.elapsed();
148        self.metrics.duration_seconds.record(elapsed);
149
150        let message = match output.progress {
151            PruneProgress::HasMoreData(_) => "Pruner interrupted and has more data to prune",
152            PruneProgress::Finished => "Pruner finished",
153        };
154
155        debug!(
156            target: "pruner",
157            %tip_block_number,
158            ?elapsed,
159            ?deleted_entries,
160            ?limiter,
161            ?output,
162            ?stats,
163            "{message}",
164        );
165
166        self.event_sender.notify(PrunerEvent::Finished { tip_block_number, elapsed, stats });
167
168        Ok(output)
169    }
170
171    /// Prunes the segments that the [Pruner] was initialized with, and the segments that needs to
172    /// be pruned according to the highest `static_files`. Segments are parts of the database that
173    /// represent one or more tables.
174    ///
175    /// Returns a list of stats per pruned segment, total number of entries pruned, and
176    /// [`PruneProgress`].
177    fn prune_segments(
178        &mut self,
179        provider: &Provider,
180        tip_block_number: BlockNumber,
181        limiter: &mut PruneLimiter,
182    ) -> Result<(Vec<PrunedSegmentInfo>, usize, PrunerOutput), PrunerError> {
183        let mut stats = Vec::with_capacity(self.segments.len());
184        let mut pruned = 0;
185        let mut output = PrunerOutput {
186            progress: PruneProgress::Finished,
187            segments: Vec::with_capacity(self.segments.len()),
188        };
189
190        for segment in &self.segments {
191            if limiter.is_limit_reached() {
192                break
193            }
194
195            if let Some((to_block, prune_mode)) = segment
196                .mode()
197                .map(|mode| {
198                    mode.prune_target_block(tip_block_number, segment.segment(), segment.purpose())
199                })
200                .transpose()?
201                .flatten()
202            {
203                debug!(
204                    target: "pruner",
205                    segment = ?segment.segment(),
206                    purpose = ?segment.purpose(),
207                    %to_block,
208                    ?prune_mode,
209                    "Segment pruning started"
210                );
211
212                let segment_start = Instant::now();
213                let previous_checkpoint = provider.get_prune_checkpoint(segment.segment())?;
214                let segment_output = segment.prune(
215                    provider,
216                    PruneInput { previous_checkpoint, to_block, limiter: limiter.clone() },
217                )?;
218                if let Some(checkpoint) = segment_output.checkpoint {
219                    segment
220                        .save_checkpoint(provider, checkpoint.as_prune_checkpoint(prune_mode))?;
221                }
222                self.metrics
223                    .get_prune_segment_metrics(segment.segment())
224                    .duration_seconds
225                    .record(segment_start.elapsed());
226                if let Some(highest_pruned_block) =
227                    segment_output.checkpoint.and_then(|checkpoint| checkpoint.block_number)
228                {
229                    self.metrics
230                        .get_prune_segment_metrics(segment.segment())
231                        .highest_pruned_block
232                        .set(highest_pruned_block as f64);
233                }
234
235                output.progress = segment_output.progress;
236                output.segments.push((segment.segment(), segment_output));
237
238                debug!(
239                    target: "pruner",
240                    segment = ?segment.segment(),
241                    purpose = ?segment.purpose(),
242                    %to_block,
243                    ?prune_mode,
244                    %segment_output.pruned,
245                    "Segment pruning finished"
246                );
247
248                if segment_output.pruned > 0 {
249                    limiter.increment_deleted_entries_count_by(segment_output.pruned);
250                    pruned += segment_output.pruned;
251                    let info = PrunedSegmentInfo {
252                        segment: segment.segment(),
253                        pruned: segment_output.pruned,
254                        progress: segment_output.progress,
255                    };
256                    stats.push(info);
257                }
258            } else {
259                debug!(target: "pruner", segment = ?segment.segment(), purpose = ?segment.purpose(), "Nothing to prune for the segment");
260            }
261        }
262
263        Ok((stats, pruned, output))
264    }
265
266    /// Returns `true` if the pruning is needed at the provided tip block number.
267    /// This is determined by the check against minimum pruning interval and last pruned block
268    /// number.
269    pub fn is_pruning_needed(&self, tip_block_number: BlockNumber) -> bool {
270        let Some(tip_block_number) =
271            self.adjust_tip_block_number_to_finished_exex_height(tip_block_number)
272        else {
273            return false
274        };
275
276        // Saturating subtraction is needed for the case when the chain was reverted, meaning
277        // current block number might be less than the previous tip block number.
278        // If that's the case, no pruning is needed as outdated data is also reverted.
279        if tip_block_number.saturating_sub(self.previous_tip_block_number.unwrap_or_default()) >=
280            self.min_block_interval as u64
281        {
282            debug!(
283                target: "pruner",
284                previous_tip_block_number = ?self.previous_tip_block_number,
285                %tip_block_number,
286                "Minimum pruning interval reached"
287            );
288            true
289        } else {
290            false
291        }
292    }
293
294    /// Adjusts the tip block number to the finished `ExEx` height. This is needed to not prune more
295    /// data than `ExExs` have processed. Depending on the height:
296    /// - [`FinishedExExHeight::NoExExs`] returns the tip block number as no adjustment for `ExExs`
297    ///   is needed.
298    /// - [`FinishedExExHeight::NotReady`] returns `None` as not all `ExExs` have emitted a
299    ///   `FinishedHeight` event yet.
300    /// - [`FinishedExExHeight::Height`] returns the finished `ExEx` height.
301    fn adjust_tip_block_number_to_finished_exex_height(
302        &self,
303        tip_block_number: BlockNumber,
304    ) -> Option<BlockNumber> {
305        match *self.finished_exex_height.borrow() {
306            FinishedExExHeight::NoExExs => Some(tip_block_number),
307            FinishedExExHeight::NotReady => {
308                debug!(target: "pruner", %tip_block_number, "Not all ExExs have emitted a `FinishedHeight` event yet, can't prune");
309                None
310            }
311            FinishedExExHeight::Height(finished_exex_height) => {
312                debug!(target: "pruner", %tip_block_number, %finished_exex_height, "Adjusting tip block number to the finished ExEx height");
313                Some(finished_exex_height)
314            }
315        }
316    }
317}
318
319impl<PF> Pruner<PF::ProviderRW, PF>
320where
321    PF: DatabaseProviderFactory<ProviderRW: PruneCheckpointWriter + PruneCheckpointReader>,
322{
323    /// Run the pruner. This will only prune data up to the highest finished ExEx height, if there
324    /// are no ExExes.
325    ///
326    /// Returns a [`PruneProgress`], indicating whether pruning is finished, or there is more data
327    /// to prune.
328    pub fn run(&mut self, tip_block_number: BlockNumber) -> PrunerResult {
329        let provider = self.provider_factory.database_provider_rw()?;
330        let result = self.run_with_provider(&provider, tip_block_number);
331        provider.commit()?;
332        result
333    }
334}
335
336#[cfg(test)]
337mod tests {
338    use crate::Pruner;
339    use reth_exex_types::FinishedExExHeight;
340    use reth_provider::test_utils::create_test_provider_factory;
341
342    #[test]
343    fn is_pruning_needed() {
344        let provider_factory = create_test_provider_factory();
345
346        let (finished_exex_height_tx, finished_exex_height_rx) =
347            tokio::sync::watch::channel(FinishedExExHeight::NoExExs);
348
349        let mut pruner =
350            Pruner::new_with_factory(provider_factory, vec![], 5, 0, None, finished_exex_height_rx);
351
352        // No last pruned block number was set before
353        let first_block_number = 1;
354        assert!(!pruner.is_pruning_needed(first_block_number));
355        pruner.previous_tip_block_number = Some(first_block_number);
356
357        // Tip block number delta is >= than min block interval
358        let second_block_number = first_block_number + pruner.min_block_interval as u64;
359        assert!(pruner.is_pruning_needed(second_block_number));
360        pruner.previous_tip_block_number = Some(second_block_number);
361
362        // Tip block number delta is < than min block interval
363        assert!(!pruner.is_pruning_needed(second_block_number));
364
365        // Tip block number delta is >= than min block interval
366        let third_block_number = second_block_number + pruner.min_block_interval as u64;
367        assert!(pruner.is_pruning_needed(third_block_number));
368
369        // Not all ExExs have emitted a `FinishedHeight` event yet
370        finished_exex_height_tx.send(FinishedExExHeight::NotReady).unwrap();
371        assert!(!pruner.is_pruning_needed(third_block_number));
372
373        // Adjust tip block number to the finished ExEx height that doesn't reach the threshold
374        finished_exex_height_tx.send(FinishedExExHeight::Height(second_block_number)).unwrap();
375        assert!(!pruner.is_pruning_needed(third_block_number));
376
377        // Adjust tip block number to the finished ExEx height that reaches the threshold
378        finished_exex_height_tx.send(FinishedExExHeight::Height(third_block_number)).unwrap();
379        assert!(pruner.is_pruning_needed(third_block_number));
380    }
381}