1use crate::{
4 segments::{PruneInput, Segment},
5 Metrics, PruneLimiter, PrunerError, PrunerEvent,
6};
7use alloy_primitives::BlockNumber;
8use reth_exex_types::FinishedExExHeight;
9use reth_primitives_traits::FastInstant as Instant;
10use reth_provider::{
11 DBProvider, DatabaseProviderFactory, PruneCheckpointReader, PruneCheckpointWriter,
12 StageCheckpointReader,
13};
14use reth_prune_types::{PruneProgress, PrunedSegmentInfo, PrunerOutput};
15use reth_stages_types::StageId;
16use reth_tokio_util::{EventSender, EventStream};
17use std::time::Duration;
18use tokio::sync::watch;
19use tracing::{debug, instrument};
20
21pub type PrunerResult = Result<PrunerOutput, PrunerError>;
23
24pub type PrunerWithResult<S, DB> = (Pruner<S, DB>, PrunerResult);
26
27pub type PrunerWithFactory<PF> = Pruner<<PF as DatabaseProviderFactory>::ProviderRW, PF>;
29
30#[derive(Debug)]
32pub struct Pruner<Provider, PF> {
33 provider_factory: PF,
35 segments: Vec<Box<dyn Segment<Provider>>>,
36 min_block_interval: usize,
39 previous_tip_block_number: Option<BlockNumber>,
43 delete_limit: usize,
45 timeout: Option<Duration>,
47 minimum_pruning_distance: Option<u64>,
50 finished_exex_height: watch::Receiver<FinishedExExHeight>,
52 #[doc(hidden)]
53 metrics: Metrics,
54 event_sender: EventSender<PrunerEvent>,
55}
56
57impl<Provider> Pruner<Provider, ()> {
58 pub fn new(
60 segments: Vec<Box<dyn Segment<Provider>>>,
61 min_block_interval: usize,
62 delete_limit: usize,
63 timeout: Option<Duration>,
64 finished_exex_height: watch::Receiver<FinishedExExHeight>,
65 ) -> Self {
66 Self {
67 provider_factory: (),
68 segments,
69 min_block_interval,
70 previous_tip_block_number: None,
71 delete_limit,
72 timeout,
73 minimum_pruning_distance: None,
74 finished_exex_height,
75 metrics: Metrics::default(),
76 event_sender: Default::default(),
77 }
78 }
79}
80
81impl<PF> Pruner<PF::ProviderRW, PF>
82where
83 PF: DatabaseProviderFactory,
84{
85 pub fn new_with_factory(
87 provider_factory: PF,
88 segments: Vec<Box<dyn Segment<PF::ProviderRW>>>,
89 min_block_interval: usize,
90 delete_limit: usize,
91 timeout: Option<Duration>,
92 finished_exex_height: watch::Receiver<FinishedExExHeight>,
93 ) -> Self {
94 Self {
95 provider_factory,
96 segments,
97 min_block_interval,
98 previous_tip_block_number: None,
99 delete_limit,
100 timeout,
101 minimum_pruning_distance: None,
102 finished_exex_height,
103 metrics: Metrics::default(),
104 event_sender: Default::default(),
105 }
106 }
107}
108
109impl<Provider, S> Pruner<Provider, S> {
110 pub const fn with_minimum_pruning_distance(mut self, distance: u64) -> Self {
112 self.minimum_pruning_distance = Some(distance);
113 self
114 }
115}
116
117impl<Provider, S> Pruner<Provider, S>
118where
119 Provider: PruneCheckpointReader + PruneCheckpointWriter + StageCheckpointReader,
120{
121 pub fn events(&self) -> EventStream<PrunerEvent> {
123 self.event_sender.new_listener()
124 }
125
126 #[instrument(
132 name = "Pruner::run_with_provider",
133 level = "debug",
134 target = "pruner",
135 skip(self, provider)
136 )]
137 pub fn run_with_provider(
138 &mut self,
139 provider: &Provider,
140 tip_block_number: BlockNumber,
141 ) -> PrunerResult {
142 let Some(tip_block_number) =
143 self.adjust_tip_block_number_to_finished_exex_height(tip_block_number)
144 else {
145 return Ok(PruneProgress::Finished.into())
146 };
147 if tip_block_number == 0 {
148 self.previous_tip_block_number = Some(tip_block_number);
149
150 debug!(target: "pruner", %tip_block_number, "Nothing to prune yet");
151 return Ok(PruneProgress::Finished.into())
152 }
153
154 self.event_sender.notify(PrunerEvent::Started { tip_block_number });
155
156 debug!(target: "pruner", %tip_block_number, "Pruner started");
157 let start = Instant::now();
158
159 let mut limiter = PruneLimiter::default().set_deleted_entries_limit(self.delete_limit);
160 if let Some(timeout) = self.timeout {
161 limiter = limiter.set_time_limit(timeout);
162 };
163
164 let (stats, deleted_entries, output) =
165 self.prune_segments(provider, tip_block_number, &mut limiter)?;
166
167 self.previous_tip_block_number = Some(tip_block_number);
168
169 let elapsed = start.elapsed();
170 self.metrics.duration_seconds.record(elapsed);
171
172 output.debug_log(tip_block_number, deleted_entries, elapsed);
173
174 self.event_sender.notify(PrunerEvent::Finished { tip_block_number, elapsed, stats });
175
176 Ok(output)
177 }
178
179 #[instrument(level = "debug", target = "pruner", skip_all, fields(segments = self.segments.len()))]
186 fn prune_segments(
187 &mut self,
188 provider: &Provider,
189 tip_block_number: BlockNumber,
190 limiter: &mut PruneLimiter,
191 ) -> Result<(Vec<PrunedSegmentInfo>, usize, PrunerOutput), PrunerError> {
192 let mut stats = Vec::with_capacity(self.segments.len());
193 let mut pruned = 0;
194 let mut output = PrunerOutput {
195 progress: PruneProgress::Finished,
196 segments: Vec::with_capacity(self.segments.len()),
197 };
198
199 for segment in &self.segments {
200 if limiter.is_limit_reached() {
201 output.progress =
202 output.progress.combine(PruneProgress::HasMoreData(limiter.interrupt_reason()));
203 break
204 }
205
206 if let Some((to_block, prune_mode)) = segment
207 .mode()
208 .map(|mode| {
209 mode.prune_target_block_with_min(
210 tip_block_number,
211 segment.segment(),
212 segment.purpose(),
213 self.minimum_pruning_distance,
214 )
215 })
216 .transpose()?
217 .flatten()
218 {
219 if let Some(required_stage) = segment.required_stage() &&
221 !is_stage_finished(provider, required_stage)?
222 {
223 debug!(
224 target: "pruner",
225 segment = ?segment.segment(),
226 ?required_stage,
227 "Segment's required stage not finished, skipping"
228 );
229 continue
230 }
231
232 debug!(
233 target: "pruner",
234 segment = ?segment.segment(),
235 purpose = ?segment.purpose(),
236 %to_block,
237 ?prune_mode,
238 "Segment pruning started"
239 );
240
241 let segment_start = Instant::now();
242 let previous_checkpoint = provider.get_prune_checkpoint(segment.segment())?;
243 let segment_output = segment.prune(
244 provider,
245 PruneInput { previous_checkpoint, to_block, limiter: limiter.clone() },
246 )?;
247 if let Some(checkpoint) = segment_output.checkpoint {
248 segment
249 .save_checkpoint(provider, checkpoint.as_prune_checkpoint(prune_mode))?;
250 }
251 self.metrics
252 .get_prune_segment_metrics(segment.segment())
253 .duration_seconds
254 .record(segment_start.elapsed());
255 if let Some(highest_pruned_block) =
256 segment_output.checkpoint.and_then(|checkpoint| checkpoint.block_number)
257 {
258 self.metrics
259 .get_prune_segment_metrics(segment.segment())
260 .highest_pruned_block
261 .set(highest_pruned_block as f64);
262 }
263
264 output.progress = output.progress.combine(segment_output.progress);
265 output.segments.push((segment.segment(), segment_output));
266
267 debug!(
268 target: "pruner",
269 segment = ?segment.segment(),
270 purpose = ?segment.purpose(),
271 %to_block,
272 ?prune_mode,
273 %segment_output.pruned,
274 "Segment pruning finished"
275 );
276
277 if segment_output.pruned > 0 {
278 limiter.increment_deleted_entries_count_by(segment_output.pruned);
279 pruned += segment_output.pruned;
280 let info = PrunedSegmentInfo {
281 segment: segment.segment(),
282 pruned: segment_output.pruned,
283 progress: segment_output.progress,
284 };
285 stats.push(info);
286 }
287 } else {
288 debug!(target: "pruner", segment = ?segment.segment(), purpose = ?segment.purpose(), "Nothing to prune for the segment");
289 }
290 }
291
292 Ok((stats, pruned, output))
293 }
294
295 pub fn is_pruning_needed(&self, tip_block_number: BlockNumber) -> bool {
299 let Some(tip_block_number) =
300 self.adjust_tip_block_number_to_finished_exex_height(tip_block_number)
301 else {
302 return false
303 };
304
305 if tip_block_number.saturating_sub(self.previous_tip_block_number.unwrap_or_default()) >=
309 self.min_block_interval as u64
310 {
311 debug!(
312 target: "pruner",
313 previous_tip_block_number = ?self.previous_tip_block_number,
314 %tip_block_number,
315 "Minimum pruning interval reached"
316 );
317 true
318 } else {
319 false
320 }
321 }
322
323 fn adjust_tip_block_number_to_finished_exex_height(
331 &self,
332 tip_block_number: BlockNumber,
333 ) -> Option<BlockNumber> {
334 match *self.finished_exex_height.borrow() {
335 FinishedExExHeight::NoExExs => Some(tip_block_number),
336 FinishedExExHeight::NotReady => {
337 debug!(target: "pruner", %tip_block_number, "Not all ExExs have emitted a `FinishedHeight` event yet, can't prune");
338 None
339 }
340 FinishedExExHeight::Height(finished_exex_height) => {
341 debug!(target: "pruner", %tip_block_number, %finished_exex_height, "Adjusting tip block number to the finished ExEx height");
342 Some(finished_exex_height)
343 }
344 }
345 }
346}
347
348impl<PF> Pruner<PF::ProviderRW, PF>
349where
350 PF: DatabaseProviderFactory<
351 ProviderRW: PruneCheckpointWriter + PruneCheckpointReader + StageCheckpointReader,
352 >,
353{
354 #[instrument(name = "Pruner::run", level = "debug", target = "pruner", skip(self))]
360 pub fn run(&mut self, tip_block_number: BlockNumber) -> PrunerResult {
361 let provider = self.provider_factory.database_provider_rw()?;
362 let result = self.run_with_provider(&provider, tip_block_number);
363 provider.commit()?;
364 result
365 }
366}
367
368fn is_stage_finished<Provider: StageCheckpointReader>(
372 provider: &Provider,
373 stage_id: StageId,
374) -> Result<bool, PrunerError> {
375 let stage_checkpoint = provider.get_stage_checkpoint(stage_id)?.map(|c| c.block_number);
376 let finish_checkpoint = provider.get_stage_checkpoint(StageId::Finish)?.map(|c| c.block_number);
377
378 Ok(stage_checkpoint >= finish_checkpoint)
379}
380
381#[cfg(test)]
382mod tests {
383 use crate::Pruner;
384 use reth_exex_types::FinishedExExHeight;
385 use reth_provider::test_utils::create_test_provider_factory;
386
387 #[test]
388 fn is_pruning_needed() {
389 let provider_factory = create_test_provider_factory();
390
391 let (finished_exex_height_tx, finished_exex_height_rx) =
392 tokio::sync::watch::channel(FinishedExExHeight::NoExExs);
393
394 let mut pruner =
395 Pruner::new_with_factory(provider_factory, vec![], 5, 0, None, finished_exex_height_rx);
396
397 let first_block_number = 1;
399 assert!(!pruner.is_pruning_needed(first_block_number));
400 pruner.previous_tip_block_number = Some(first_block_number);
401
402 let second_block_number = first_block_number + pruner.min_block_interval as u64;
404 assert!(pruner.is_pruning_needed(second_block_number));
405 pruner.previous_tip_block_number = Some(second_block_number);
406
407 assert!(!pruner.is_pruning_needed(second_block_number));
409
410 let third_block_number = second_block_number + pruner.min_block_interval as u64;
412 assert!(pruner.is_pruning_needed(third_block_number));
413
414 finished_exex_height_tx.send(FinishedExExHeight::NotReady).unwrap();
416 assert!(!pruner.is_pruning_needed(third_block_number));
417
418 finished_exex_height_tx.send(FinishedExExHeight::Height(second_block_number)).unwrap();
420 assert!(!pruner.is_pruning_needed(third_block_number));
421
422 finished_exex_height_tx.send(FinishedExExHeight::Height(third_block_number)).unwrap();
424 assert!(pruner.is_pruning_needed(third_block_number));
425 }
426}