1use crate::{
4 segments::{PruneInput, Segment},
5 Metrics, PruneLimiter, PrunerError, PrunerEvent,
6};
7use alloy_primitives::BlockNumber;
8use reth_exex_types::FinishedExExHeight;
9use reth_provider::{
10 DBProvider, DatabaseProviderFactory, PruneCheckpointReader, PruneCheckpointWriter,
11 StageCheckpointReader,
12};
13use reth_prune_types::{PruneProgress, PrunedSegmentInfo, PrunerOutput};
14use reth_stages_types::StageId;
15use reth_tokio_util::{EventSender, EventStream};
16use std::time::{Duration, Instant};
17use tokio::sync::watch;
18use tracing::debug;
19
20pub type PrunerResult = Result<PrunerOutput, PrunerError>;
22
23pub type PrunerWithResult<S, DB> = (Pruner<S, DB>, PrunerResult);
25
26pub type PrunerWithFactory<PF> = Pruner<<PF as DatabaseProviderFactory>::ProviderRW, PF>;
28
29#[derive(Debug)]
31pub struct Pruner<Provider, PF> {
32 provider_factory: PF,
34 segments: Vec<Box<dyn Segment<Provider>>>,
35 min_block_interval: usize,
38 previous_tip_block_number: Option<BlockNumber>,
42 delete_limit: usize,
44 timeout: Option<Duration>,
46 finished_exex_height: watch::Receiver<FinishedExExHeight>,
48 #[doc(hidden)]
49 metrics: Metrics,
50 event_sender: EventSender<PrunerEvent>,
51}
52
53impl<Provider> Pruner<Provider, ()> {
54 pub fn new(
56 segments: Vec<Box<dyn Segment<Provider>>>,
57 min_block_interval: usize,
58 delete_limit: usize,
59 timeout: Option<Duration>,
60 finished_exex_height: watch::Receiver<FinishedExExHeight>,
61 ) -> Self {
62 Self {
63 provider_factory: (),
64 segments,
65 min_block_interval,
66 previous_tip_block_number: None,
67 delete_limit,
68 timeout,
69 finished_exex_height,
70 metrics: Metrics::default(),
71 event_sender: Default::default(),
72 }
73 }
74}
75
76impl<PF> Pruner<PF::ProviderRW, PF>
77where
78 PF: DatabaseProviderFactory,
79{
80 pub fn new_with_factory(
82 provider_factory: PF,
83 segments: Vec<Box<dyn Segment<PF::ProviderRW>>>,
84 min_block_interval: usize,
85 delete_limit: usize,
86 timeout: Option<Duration>,
87 finished_exex_height: watch::Receiver<FinishedExExHeight>,
88 ) -> Self {
89 Self {
90 provider_factory,
91 segments,
92 min_block_interval,
93 previous_tip_block_number: None,
94 delete_limit,
95 timeout,
96 finished_exex_height,
97 metrics: Metrics::default(),
98 event_sender: Default::default(),
99 }
100 }
101}
102
103impl<Provider, S> Pruner<Provider, S>
104where
105 Provider: PruneCheckpointReader + PruneCheckpointWriter + StageCheckpointReader,
106{
107 pub fn events(&self) -> EventStream<PrunerEvent> {
109 self.event_sender.new_listener()
110 }
111
112 pub fn run_with_provider(
118 &mut self,
119 provider: &Provider,
120 tip_block_number: BlockNumber,
121 ) -> PrunerResult {
122 let Some(tip_block_number) =
123 self.adjust_tip_block_number_to_finished_exex_height(tip_block_number)
124 else {
125 return Ok(PruneProgress::Finished.into())
126 };
127 if tip_block_number == 0 {
128 self.previous_tip_block_number = Some(tip_block_number);
129
130 debug!(target: "pruner", %tip_block_number, "Nothing to prune yet");
131 return Ok(PruneProgress::Finished.into())
132 }
133
134 self.event_sender.notify(PrunerEvent::Started { tip_block_number });
135
136 debug!(target: "pruner", %tip_block_number, "Pruner started");
137 let start = Instant::now();
138
139 let mut limiter = PruneLimiter::default().set_deleted_entries_limit(self.delete_limit);
140 if let Some(timeout) = self.timeout {
141 limiter = limiter.set_time_limit(timeout);
142 };
143
144 let (stats, deleted_entries, output) =
145 self.prune_segments(provider, tip_block_number, &mut limiter)?;
146
147 self.previous_tip_block_number = Some(tip_block_number);
148
149 let elapsed = start.elapsed();
150 self.metrics.duration_seconds.record(elapsed);
151
152 let message = match output.progress {
153 PruneProgress::HasMoreData(_) => "Pruner interrupted and has more data to prune",
154 PruneProgress::Finished => "Pruner finished",
155 };
156
157 debug!(
158 target: "pruner",
159 %tip_block_number,
160 ?elapsed,
161 ?deleted_entries,
162 ?limiter,
163 ?output,
164 ?stats,
165 "{message}",
166 );
167
168 self.event_sender.notify(PrunerEvent::Finished { tip_block_number, elapsed, stats });
169
170 Ok(output)
171 }
172
173 fn prune_segments(
180 &mut self,
181 provider: &Provider,
182 tip_block_number: BlockNumber,
183 limiter: &mut PruneLimiter,
184 ) -> Result<(Vec<PrunedSegmentInfo>, usize, PrunerOutput), PrunerError> {
185 let mut stats = Vec::with_capacity(self.segments.len());
186 let mut pruned = 0;
187 let mut output = PrunerOutput {
188 progress: PruneProgress::Finished,
189 segments: Vec::with_capacity(self.segments.len()),
190 };
191
192 for segment in &self.segments {
193 if limiter.is_limit_reached() {
194 break
195 }
196
197 if let Some((to_block, prune_mode)) = segment
198 .mode()
199 .map(|mode| {
200 mode.prune_target_block(tip_block_number, segment.segment(), segment.purpose())
201 })
202 .transpose()?
203 .flatten()
204 {
205 if let Some(required_stage) = segment.required_stage() &&
207 !is_stage_finished(provider, required_stage)?
208 {
209 debug!(
210 target: "pruner",
211 segment = ?segment.segment(),
212 ?required_stage,
213 "Segment's required stage not finished, skipping"
214 );
215 continue
216 }
217
218 debug!(
219 target: "pruner",
220 segment = ?segment.segment(),
221 purpose = ?segment.purpose(),
222 %to_block,
223 ?prune_mode,
224 "Segment pruning started"
225 );
226
227 let segment_start = Instant::now();
228 let previous_checkpoint = provider.get_prune_checkpoint(segment.segment())?;
229 let segment_output = segment.prune(
230 provider,
231 PruneInput { previous_checkpoint, to_block, limiter: limiter.clone() },
232 )?;
233 if let Some(checkpoint) = segment_output.checkpoint {
234 segment
235 .save_checkpoint(provider, checkpoint.as_prune_checkpoint(prune_mode))?;
236 }
237 self.metrics
238 .get_prune_segment_metrics(segment.segment())
239 .duration_seconds
240 .record(segment_start.elapsed());
241 if let Some(highest_pruned_block) =
242 segment_output.checkpoint.and_then(|checkpoint| checkpoint.block_number)
243 {
244 self.metrics
245 .get_prune_segment_metrics(segment.segment())
246 .highest_pruned_block
247 .set(highest_pruned_block as f64);
248 }
249
250 output.progress = segment_output.progress;
251 output.segments.push((segment.segment(), segment_output));
252
253 debug!(
254 target: "pruner",
255 segment = ?segment.segment(),
256 purpose = ?segment.purpose(),
257 %to_block,
258 ?prune_mode,
259 %segment_output.pruned,
260 "Segment pruning finished"
261 );
262
263 if segment_output.pruned > 0 {
264 limiter.increment_deleted_entries_count_by(segment_output.pruned);
265 pruned += segment_output.pruned;
266 let info = PrunedSegmentInfo {
267 segment: segment.segment(),
268 pruned: segment_output.pruned,
269 progress: segment_output.progress,
270 };
271 stats.push(info);
272 }
273 } else {
274 debug!(target: "pruner", segment = ?segment.segment(), purpose = ?segment.purpose(), "Nothing to prune for the segment");
275 }
276 }
277
278 Ok((stats, pruned, output))
279 }
280
281 pub fn is_pruning_needed(&self, tip_block_number: BlockNumber) -> bool {
285 let Some(tip_block_number) =
286 self.adjust_tip_block_number_to_finished_exex_height(tip_block_number)
287 else {
288 return false
289 };
290
291 if tip_block_number.saturating_sub(self.previous_tip_block_number.unwrap_or_default()) >=
295 self.min_block_interval as u64
296 {
297 debug!(
298 target: "pruner",
299 previous_tip_block_number = ?self.previous_tip_block_number,
300 %tip_block_number,
301 "Minimum pruning interval reached"
302 );
303 true
304 } else {
305 false
306 }
307 }
308
309 fn adjust_tip_block_number_to_finished_exex_height(
317 &self,
318 tip_block_number: BlockNumber,
319 ) -> Option<BlockNumber> {
320 match *self.finished_exex_height.borrow() {
321 FinishedExExHeight::NoExExs => Some(tip_block_number),
322 FinishedExExHeight::NotReady => {
323 debug!(target: "pruner", %tip_block_number, "Not all ExExs have emitted a `FinishedHeight` event yet, can't prune");
324 None
325 }
326 FinishedExExHeight::Height(finished_exex_height) => {
327 debug!(target: "pruner", %tip_block_number, %finished_exex_height, "Adjusting tip block number to the finished ExEx height");
328 Some(finished_exex_height)
329 }
330 }
331 }
332}
333
334impl<PF> Pruner<PF::ProviderRW, PF>
335where
336 PF: DatabaseProviderFactory<
337 ProviderRW: PruneCheckpointWriter + PruneCheckpointReader + StageCheckpointReader,
338 >,
339{
340 pub fn run(&mut self, tip_block_number: BlockNumber) -> PrunerResult {
346 let provider = self.provider_factory.database_provider_rw()?;
347 let result = self.run_with_provider(&provider, tip_block_number);
348 provider.commit()?;
349 result
350 }
351}
352
353fn is_stage_finished<Provider: StageCheckpointReader>(
357 provider: &Provider,
358 stage_id: StageId,
359) -> Result<bool, PrunerError> {
360 let stage_checkpoint = provider.get_stage_checkpoint(stage_id)?.map(|c| c.block_number);
361 let finish_checkpoint = provider.get_stage_checkpoint(StageId::Finish)?.map(|c| c.block_number);
362
363 Ok(stage_checkpoint >= finish_checkpoint)
364}
365
366#[cfg(test)]
367mod tests {
368 use crate::Pruner;
369 use reth_exex_types::FinishedExExHeight;
370 use reth_provider::test_utils::create_test_provider_factory;
371
372 #[test]
373 fn is_pruning_needed() {
374 let provider_factory = create_test_provider_factory();
375
376 let (finished_exex_height_tx, finished_exex_height_rx) =
377 tokio::sync::watch::channel(FinishedExExHeight::NoExExs);
378
379 let mut pruner =
380 Pruner::new_with_factory(provider_factory, vec![], 5, 0, None, finished_exex_height_rx);
381
382 let first_block_number = 1;
384 assert!(!pruner.is_pruning_needed(first_block_number));
385 pruner.previous_tip_block_number = Some(first_block_number);
386
387 let second_block_number = first_block_number + pruner.min_block_interval as u64;
389 assert!(pruner.is_pruning_needed(second_block_number));
390 pruner.previous_tip_block_number = Some(second_block_number);
391
392 assert!(!pruner.is_pruning_needed(second_block_number));
394
395 let third_block_number = second_block_number + pruner.min_block_interval as u64;
397 assert!(pruner.is_pruning_needed(third_block_number));
398
399 finished_exex_height_tx.send(FinishedExExHeight::NotReady).unwrap();
401 assert!(!pruner.is_pruning_needed(third_block_number));
402
403 finished_exex_height_tx.send(FinishedExExHeight::Height(second_block_number)).unwrap();
405 assert!(!pruner.is_pruning_needed(third_block_number));
406
407 finished_exex_height_tx.send(FinishedExExHeight::Height(third_block_number)).unwrap();
409 assert!(pruner.is_pruning_needed(third_block_number));
410 }
411}