1use crate::{
4 segments::{PruneInput, Segment},
5 Metrics, PruneLimiter, PrunerError, PrunerEvent,
6};
7use alloy_primitives::BlockNumber;
8use reth_exex_types::FinishedExExHeight;
9use reth_provider::{
10 DBProvider, DatabaseProviderFactory, PruneCheckpointReader, PruneCheckpointWriter,
11 StageCheckpointReader,
12};
13use reth_prune_types::{PruneProgress, PrunedSegmentInfo, PrunerOutput};
14use reth_stages_types::StageId;
15use reth_tokio_util::{EventSender, EventStream};
16use std::time::{Duration, Instant};
17use tokio::sync::watch;
18use tracing::{debug, instrument};
19
20pub type PrunerResult = Result<PrunerOutput, PrunerError>;
22
23pub type PrunerWithResult<S, DB> = (Pruner<S, DB>, PrunerResult);
25
26pub type PrunerWithFactory<PF> = Pruner<<PF as DatabaseProviderFactory>::ProviderRW, PF>;
28
29#[derive(Debug)]
31pub struct Pruner<Provider, PF> {
32 provider_factory: PF,
34 segments: Vec<Box<dyn Segment<Provider>>>,
35 min_block_interval: usize,
38 previous_tip_block_number: Option<BlockNumber>,
42 delete_limit: usize,
44 timeout: Option<Duration>,
46 finished_exex_height: watch::Receiver<FinishedExExHeight>,
48 #[doc(hidden)]
49 metrics: Metrics,
50 event_sender: EventSender<PrunerEvent>,
51}
52
53impl<Provider> Pruner<Provider, ()> {
54 pub fn new(
56 segments: Vec<Box<dyn Segment<Provider>>>,
57 min_block_interval: usize,
58 delete_limit: usize,
59 timeout: Option<Duration>,
60 finished_exex_height: watch::Receiver<FinishedExExHeight>,
61 ) -> Self {
62 Self {
63 provider_factory: (),
64 segments,
65 min_block_interval,
66 previous_tip_block_number: None,
67 delete_limit,
68 timeout,
69 finished_exex_height,
70 metrics: Metrics::default(),
71 event_sender: Default::default(),
72 }
73 }
74}
75
76impl<PF> Pruner<PF::ProviderRW, PF>
77where
78 PF: DatabaseProviderFactory,
79{
80 pub fn new_with_factory(
82 provider_factory: PF,
83 segments: Vec<Box<dyn Segment<PF::ProviderRW>>>,
84 min_block_interval: usize,
85 delete_limit: usize,
86 timeout: Option<Duration>,
87 finished_exex_height: watch::Receiver<FinishedExExHeight>,
88 ) -> Self {
89 Self {
90 provider_factory,
91 segments,
92 min_block_interval,
93 previous_tip_block_number: None,
94 delete_limit,
95 timeout,
96 finished_exex_height,
97 metrics: Metrics::default(),
98 event_sender: Default::default(),
99 }
100 }
101}
102
103impl<Provider, S> Pruner<Provider, S>
104where
105 Provider: PruneCheckpointReader + PruneCheckpointWriter + StageCheckpointReader,
106{
107 pub fn events(&self) -> EventStream<PrunerEvent> {
109 self.event_sender.new_listener()
110 }
111
112 #[instrument(
118 name = "Pruner::run_with_provider",
119 level = "debug",
120 target = "pruner",
121 skip(self, provider)
122 )]
123 pub fn run_with_provider(
124 &mut self,
125 provider: &Provider,
126 tip_block_number: BlockNumber,
127 ) -> PrunerResult {
128 let Some(tip_block_number) =
129 self.adjust_tip_block_number_to_finished_exex_height(tip_block_number)
130 else {
131 return Ok(PruneProgress::Finished.into())
132 };
133 if tip_block_number == 0 {
134 self.previous_tip_block_number = Some(tip_block_number);
135
136 debug!(target: "pruner", %tip_block_number, "Nothing to prune yet");
137 return Ok(PruneProgress::Finished.into())
138 }
139
140 self.event_sender.notify(PrunerEvent::Started { tip_block_number });
141
142 debug!(target: "pruner", %tip_block_number, "Pruner started");
143 let start = Instant::now();
144
145 let mut limiter = PruneLimiter::default().set_deleted_entries_limit(self.delete_limit);
146 if let Some(timeout) = self.timeout {
147 limiter = limiter.set_time_limit(timeout);
148 };
149
150 let (stats, deleted_entries, output) =
151 self.prune_segments(provider, tip_block_number, &mut limiter)?;
152
153 self.previous_tip_block_number = Some(tip_block_number);
154
155 let elapsed = start.elapsed();
156 self.metrics.duration_seconds.record(elapsed);
157
158 output.debug_log(tip_block_number, deleted_entries, elapsed);
159
160 self.event_sender.notify(PrunerEvent::Finished { tip_block_number, elapsed, stats });
161
162 Ok(output)
163 }
164
165 #[instrument(level = "debug", target = "pruner", skip_all, fields(segments = self.segments.len()))]
172 fn prune_segments(
173 &mut self,
174 provider: &Provider,
175 tip_block_number: BlockNumber,
176 limiter: &mut PruneLimiter,
177 ) -> Result<(Vec<PrunedSegmentInfo>, usize, PrunerOutput), PrunerError> {
178 let mut stats = Vec::with_capacity(self.segments.len());
179 let mut pruned = 0;
180 let mut output = PrunerOutput {
181 progress: PruneProgress::Finished,
182 segments: Vec::with_capacity(self.segments.len()),
183 };
184
185 for segment in &self.segments {
186 if limiter.is_limit_reached() {
187 output.progress =
188 output.progress.combine(PruneProgress::HasMoreData(limiter.interrupt_reason()));
189 break
190 }
191
192 if let Some((to_block, prune_mode)) = segment
193 .mode()
194 .map(|mode| {
195 mode.prune_target_block(tip_block_number, segment.segment(), segment.purpose())
196 })
197 .transpose()?
198 .flatten()
199 {
200 if let Some(required_stage) = segment.required_stage() &&
202 !is_stage_finished(provider, required_stage)?
203 {
204 debug!(
205 target: "pruner",
206 segment = ?segment.segment(),
207 ?required_stage,
208 "Segment's required stage not finished, skipping"
209 );
210 continue
211 }
212
213 debug!(
214 target: "pruner",
215 segment = ?segment.segment(),
216 purpose = ?segment.purpose(),
217 %to_block,
218 ?prune_mode,
219 "Segment pruning started"
220 );
221
222 let segment_start = Instant::now();
223 let previous_checkpoint = provider.get_prune_checkpoint(segment.segment())?;
224 let segment_output = segment.prune(
225 provider,
226 PruneInput { previous_checkpoint, to_block, limiter: limiter.clone() },
227 )?;
228 if let Some(checkpoint) = segment_output.checkpoint {
229 segment
230 .save_checkpoint(provider, checkpoint.as_prune_checkpoint(prune_mode))?;
231 }
232 self.metrics
233 .get_prune_segment_metrics(segment.segment())
234 .duration_seconds
235 .record(segment_start.elapsed());
236 if let Some(highest_pruned_block) =
237 segment_output.checkpoint.and_then(|checkpoint| checkpoint.block_number)
238 {
239 self.metrics
240 .get_prune_segment_metrics(segment.segment())
241 .highest_pruned_block
242 .set(highest_pruned_block as f64);
243 }
244
245 output.progress = output.progress.combine(segment_output.progress);
246 output.segments.push((segment.segment(), segment_output));
247
248 debug!(
249 target: "pruner",
250 segment = ?segment.segment(),
251 purpose = ?segment.purpose(),
252 %to_block,
253 ?prune_mode,
254 %segment_output.pruned,
255 "Segment pruning finished"
256 );
257
258 if segment_output.pruned > 0 {
259 limiter.increment_deleted_entries_count_by(segment_output.pruned);
260 pruned += segment_output.pruned;
261 let info = PrunedSegmentInfo {
262 segment: segment.segment(),
263 pruned: segment_output.pruned,
264 progress: segment_output.progress,
265 };
266 stats.push(info);
267 }
268 } else {
269 debug!(target: "pruner", segment = ?segment.segment(), purpose = ?segment.purpose(), "Nothing to prune for the segment");
270 }
271 }
272
273 Ok((stats, pruned, output))
274 }
275
276 pub fn is_pruning_needed(&self, tip_block_number: BlockNumber) -> bool {
280 let Some(tip_block_number) =
281 self.adjust_tip_block_number_to_finished_exex_height(tip_block_number)
282 else {
283 return false
284 };
285
286 if tip_block_number.saturating_sub(self.previous_tip_block_number.unwrap_or_default()) >=
290 self.min_block_interval as u64
291 {
292 debug!(
293 target: "pruner",
294 previous_tip_block_number = ?self.previous_tip_block_number,
295 %tip_block_number,
296 "Minimum pruning interval reached"
297 );
298 true
299 } else {
300 false
301 }
302 }
303
304 fn adjust_tip_block_number_to_finished_exex_height(
312 &self,
313 tip_block_number: BlockNumber,
314 ) -> Option<BlockNumber> {
315 match *self.finished_exex_height.borrow() {
316 FinishedExExHeight::NoExExs => Some(tip_block_number),
317 FinishedExExHeight::NotReady => {
318 debug!(target: "pruner", %tip_block_number, "Not all ExExs have emitted a `FinishedHeight` event yet, can't prune");
319 None
320 }
321 FinishedExExHeight::Height(finished_exex_height) => {
322 debug!(target: "pruner", %tip_block_number, %finished_exex_height, "Adjusting tip block number to the finished ExEx height");
323 Some(finished_exex_height)
324 }
325 }
326 }
327}
328
329impl<PF> Pruner<PF::ProviderRW, PF>
330where
331 PF: DatabaseProviderFactory<
332 ProviderRW: PruneCheckpointWriter + PruneCheckpointReader + StageCheckpointReader,
333 >,
334{
335 #[instrument(name = "Pruner::run", level = "debug", target = "pruner", skip(self))]
341 pub fn run(&mut self, tip_block_number: BlockNumber) -> PrunerResult {
342 let provider = self.provider_factory.database_provider_rw()?;
343 let result = self.run_with_provider(&provider, tip_block_number);
344 provider.commit()?;
345 result
346 }
347}
348
349fn is_stage_finished<Provider: StageCheckpointReader>(
353 provider: &Provider,
354 stage_id: StageId,
355) -> Result<bool, PrunerError> {
356 let stage_checkpoint = provider.get_stage_checkpoint(stage_id)?.map(|c| c.block_number);
357 let finish_checkpoint = provider.get_stage_checkpoint(StageId::Finish)?.map(|c| c.block_number);
358
359 Ok(stage_checkpoint >= finish_checkpoint)
360}
361
362#[cfg(test)]
363mod tests {
364 use crate::Pruner;
365 use reth_exex_types::FinishedExExHeight;
366 use reth_provider::test_utils::create_test_provider_factory;
367
368 #[test]
369 fn is_pruning_needed() {
370 let provider_factory = create_test_provider_factory();
371
372 let (finished_exex_height_tx, finished_exex_height_rx) =
373 tokio::sync::watch::channel(FinishedExExHeight::NoExExs);
374
375 let mut pruner =
376 Pruner::new_with_factory(provider_factory, vec![], 5, 0, None, finished_exex_height_rx);
377
378 let first_block_number = 1;
380 assert!(!pruner.is_pruning_needed(first_block_number));
381 pruner.previous_tip_block_number = Some(first_block_number);
382
383 let second_block_number = first_block_number + pruner.min_block_interval as u64;
385 assert!(pruner.is_pruning_needed(second_block_number));
386 pruner.previous_tip_block_number = Some(second_block_number);
387
388 assert!(!pruner.is_pruning_needed(second_block_number));
390
391 let third_block_number = second_block_number + pruner.min_block_interval as u64;
393 assert!(pruner.is_pruning_needed(third_block_number));
394
395 finished_exex_height_tx.send(FinishedExExHeight::NotReady).unwrap();
397 assert!(!pruner.is_pruning_needed(third_block_number));
398
399 finished_exex_height_tx.send(FinishedExExHeight::Height(second_block_number)).unwrap();
401 assert!(!pruner.is_pruning_needed(third_block_number));
402
403 finished_exex_height_tx.send(FinishedExExHeight::Height(third_block_number)).unwrap();
405 assert!(pruner.is_pruning_needed(third_block_number));
406 }
407}