1use crate::{
4 segments::{PruneInput, Segment},
5 Metrics, PruneLimiter, PrunerError, PrunerEvent,
6};
7use alloy_primitives::BlockNumber;
8use reth_exex_types::FinishedExExHeight;
9use reth_primitives_traits::FastInstant as Instant;
10use reth_provider::{
11 DBProvider, DatabaseProviderFactory, PruneCheckpointReader, PruneCheckpointWriter,
12 StageCheckpointReader,
13};
14use reth_prune_types::{PruneProgress, PrunedSegmentInfo, PrunerOutput};
15use reth_stages_types::StageId;
16use reth_tokio_util::{EventSender, EventStream};
17use std::time::Duration;
18use tokio::sync::watch;
19use tracing::{debug, instrument};
20
21pub type PrunerResult = Result<PrunerOutput, PrunerError>;
23
24pub type PrunerWithResult<S, DB> = (Pruner<S, DB>, PrunerResult);
26
27pub type PrunerWithFactory<PF> = Pruner<<PF as DatabaseProviderFactory>::ProviderRW, PF>;
29
30#[derive(Debug)]
32pub struct Pruner<Provider, PF> {
33 provider_factory: PF,
35 segments: Vec<Box<dyn Segment<Provider>>>,
36 min_block_interval: usize,
39 previous_tip_block_number: Option<BlockNumber>,
43 delete_limit: usize,
45 timeout: Option<Duration>,
47 finished_exex_height: watch::Receiver<FinishedExExHeight>,
49 #[doc(hidden)]
50 metrics: Metrics,
51 event_sender: EventSender<PrunerEvent>,
52}
53
54impl<Provider> Pruner<Provider, ()> {
55 pub fn new(
57 segments: Vec<Box<dyn Segment<Provider>>>,
58 min_block_interval: usize,
59 delete_limit: usize,
60 timeout: Option<Duration>,
61 finished_exex_height: watch::Receiver<FinishedExExHeight>,
62 ) -> Self {
63 Self {
64 provider_factory: (),
65 segments,
66 min_block_interval,
67 previous_tip_block_number: None,
68 delete_limit,
69 timeout,
70 finished_exex_height,
71 metrics: Metrics::default(),
72 event_sender: Default::default(),
73 }
74 }
75}
76
77impl<PF> Pruner<PF::ProviderRW, PF>
78where
79 PF: DatabaseProviderFactory,
80{
81 pub fn new_with_factory(
83 provider_factory: PF,
84 segments: Vec<Box<dyn Segment<PF::ProviderRW>>>,
85 min_block_interval: usize,
86 delete_limit: usize,
87 timeout: Option<Duration>,
88 finished_exex_height: watch::Receiver<FinishedExExHeight>,
89 ) -> Self {
90 Self {
91 provider_factory,
92 segments,
93 min_block_interval,
94 previous_tip_block_number: None,
95 delete_limit,
96 timeout,
97 finished_exex_height,
98 metrics: Metrics::default(),
99 event_sender: Default::default(),
100 }
101 }
102}
103
104impl<Provider, S> Pruner<Provider, S>
105where
106 Provider: PruneCheckpointReader + PruneCheckpointWriter + StageCheckpointReader,
107{
108 pub fn events(&self) -> EventStream<PrunerEvent> {
110 self.event_sender.new_listener()
111 }
112
113 #[instrument(
119 name = "Pruner::run_with_provider",
120 level = "debug",
121 target = "pruner",
122 skip(self, provider)
123 )]
124 pub fn run_with_provider(
125 &mut self,
126 provider: &Provider,
127 tip_block_number: BlockNumber,
128 ) -> PrunerResult {
129 let Some(tip_block_number) =
130 self.adjust_tip_block_number_to_finished_exex_height(tip_block_number)
131 else {
132 return Ok(PruneProgress::Finished.into())
133 };
134 if tip_block_number == 0 {
135 self.previous_tip_block_number = Some(tip_block_number);
136
137 debug!(target: "pruner", %tip_block_number, "Nothing to prune yet");
138 return Ok(PruneProgress::Finished.into())
139 }
140
141 self.event_sender.notify(PrunerEvent::Started { tip_block_number });
142
143 debug!(target: "pruner", %tip_block_number, "Pruner started");
144 let start = Instant::now();
145
146 let mut limiter = PruneLimiter::default().set_deleted_entries_limit(self.delete_limit);
147 if let Some(timeout) = self.timeout {
148 limiter = limiter.set_time_limit(timeout);
149 };
150
151 let (stats, deleted_entries, output) =
152 self.prune_segments(provider, tip_block_number, &mut limiter)?;
153
154 self.previous_tip_block_number = Some(tip_block_number);
155
156 let elapsed = start.elapsed();
157 self.metrics.duration_seconds.record(elapsed);
158
159 output.debug_log(tip_block_number, deleted_entries, elapsed);
160
161 self.event_sender.notify(PrunerEvent::Finished { tip_block_number, elapsed, stats });
162
163 Ok(output)
164 }
165
166 #[instrument(level = "debug", target = "pruner", skip_all, fields(segments = self.segments.len()))]
173 fn prune_segments(
174 &mut self,
175 provider: &Provider,
176 tip_block_number: BlockNumber,
177 limiter: &mut PruneLimiter,
178 ) -> Result<(Vec<PrunedSegmentInfo>, usize, PrunerOutput), PrunerError> {
179 let mut stats = Vec::with_capacity(self.segments.len());
180 let mut pruned = 0;
181 let mut output = PrunerOutput {
182 progress: PruneProgress::Finished,
183 segments: Vec::with_capacity(self.segments.len()),
184 };
185
186 for segment in &self.segments {
187 if limiter.is_limit_reached() {
188 output.progress =
189 output.progress.combine(PruneProgress::HasMoreData(limiter.interrupt_reason()));
190 break
191 }
192
193 if let Some((to_block, prune_mode)) = segment
194 .mode()
195 .map(|mode| {
196 mode.prune_target_block(tip_block_number, segment.segment(), segment.purpose())
197 })
198 .transpose()?
199 .flatten()
200 {
201 if let Some(required_stage) = segment.required_stage() &&
203 !is_stage_finished(provider, required_stage)?
204 {
205 debug!(
206 target: "pruner",
207 segment = ?segment.segment(),
208 ?required_stage,
209 "Segment's required stage not finished, skipping"
210 );
211 continue
212 }
213
214 debug!(
215 target: "pruner",
216 segment = ?segment.segment(),
217 purpose = ?segment.purpose(),
218 %to_block,
219 ?prune_mode,
220 "Segment pruning started"
221 );
222
223 let segment_start = Instant::now();
224 let previous_checkpoint = provider.get_prune_checkpoint(segment.segment())?;
225 let segment_output = segment.prune(
226 provider,
227 PruneInput { previous_checkpoint, to_block, limiter: limiter.clone() },
228 )?;
229 if let Some(checkpoint) = segment_output.checkpoint {
230 segment
231 .save_checkpoint(provider, checkpoint.as_prune_checkpoint(prune_mode))?;
232 }
233 self.metrics
234 .get_prune_segment_metrics(segment.segment())
235 .duration_seconds
236 .record(segment_start.elapsed());
237 if let Some(highest_pruned_block) =
238 segment_output.checkpoint.and_then(|checkpoint| checkpoint.block_number)
239 {
240 self.metrics
241 .get_prune_segment_metrics(segment.segment())
242 .highest_pruned_block
243 .set(highest_pruned_block as f64);
244 }
245
246 output.progress = output.progress.combine(segment_output.progress);
247 output.segments.push((segment.segment(), segment_output));
248
249 debug!(
250 target: "pruner",
251 segment = ?segment.segment(),
252 purpose = ?segment.purpose(),
253 %to_block,
254 ?prune_mode,
255 %segment_output.pruned,
256 "Segment pruning finished"
257 );
258
259 if segment_output.pruned > 0 {
260 limiter.increment_deleted_entries_count_by(segment_output.pruned);
261 pruned += segment_output.pruned;
262 let info = PrunedSegmentInfo {
263 segment: segment.segment(),
264 pruned: segment_output.pruned,
265 progress: segment_output.progress,
266 };
267 stats.push(info);
268 }
269 } else {
270 debug!(target: "pruner", segment = ?segment.segment(), purpose = ?segment.purpose(), "Nothing to prune for the segment");
271 }
272 }
273
274 Ok((stats, pruned, output))
275 }
276
277 pub fn is_pruning_needed(&self, tip_block_number: BlockNumber) -> bool {
281 let Some(tip_block_number) =
282 self.adjust_tip_block_number_to_finished_exex_height(tip_block_number)
283 else {
284 return false
285 };
286
287 if tip_block_number.saturating_sub(self.previous_tip_block_number.unwrap_or_default()) >=
291 self.min_block_interval as u64
292 {
293 debug!(
294 target: "pruner",
295 previous_tip_block_number = ?self.previous_tip_block_number,
296 %tip_block_number,
297 "Minimum pruning interval reached"
298 );
299 true
300 } else {
301 false
302 }
303 }
304
305 fn adjust_tip_block_number_to_finished_exex_height(
313 &self,
314 tip_block_number: BlockNumber,
315 ) -> Option<BlockNumber> {
316 match *self.finished_exex_height.borrow() {
317 FinishedExExHeight::NoExExs => Some(tip_block_number),
318 FinishedExExHeight::NotReady => {
319 debug!(target: "pruner", %tip_block_number, "Not all ExExs have emitted a `FinishedHeight` event yet, can't prune");
320 None
321 }
322 FinishedExExHeight::Height(finished_exex_height) => {
323 debug!(target: "pruner", %tip_block_number, %finished_exex_height, "Adjusting tip block number to the finished ExEx height");
324 Some(finished_exex_height)
325 }
326 }
327 }
328}
329
330impl<PF> Pruner<PF::ProviderRW, PF>
331where
332 PF: DatabaseProviderFactory<
333 ProviderRW: PruneCheckpointWriter + PruneCheckpointReader + StageCheckpointReader,
334 >,
335{
336 #[instrument(name = "Pruner::run", level = "debug", target = "pruner", skip(self))]
342 pub fn run(&mut self, tip_block_number: BlockNumber) -> PrunerResult {
343 let provider = self.provider_factory.database_provider_rw()?;
344 let result = self.run_with_provider(&provider, tip_block_number);
345 provider.commit()?;
346 result
347 }
348}
349
350fn is_stage_finished<Provider: StageCheckpointReader>(
354 provider: &Provider,
355 stage_id: StageId,
356) -> Result<bool, PrunerError> {
357 let stage_checkpoint = provider.get_stage_checkpoint(stage_id)?.map(|c| c.block_number);
358 let finish_checkpoint = provider.get_stage_checkpoint(StageId::Finish)?.map(|c| c.block_number);
359
360 Ok(stage_checkpoint >= finish_checkpoint)
361}
362
363#[cfg(test)]
364mod tests {
365 use crate::Pruner;
366 use reth_exex_types::FinishedExExHeight;
367 use reth_provider::test_utils::create_test_provider_factory;
368
369 #[test]
370 fn is_pruning_needed() {
371 let provider_factory = create_test_provider_factory();
372
373 let (finished_exex_height_tx, finished_exex_height_rx) =
374 tokio::sync::watch::channel(FinishedExExHeight::NoExExs);
375
376 let mut pruner =
377 Pruner::new_with_factory(provider_factory, vec![], 5, 0, None, finished_exex_height_rx);
378
379 let first_block_number = 1;
381 assert!(!pruner.is_pruning_needed(first_block_number));
382 pruner.previous_tip_block_number = Some(first_block_number);
383
384 let second_block_number = first_block_number + pruner.min_block_interval as u64;
386 assert!(pruner.is_pruning_needed(second_block_number));
387 pruner.previous_tip_block_number = Some(second_block_number);
388
389 assert!(!pruner.is_pruning_needed(second_block_number));
391
392 let third_block_number = second_block_number + pruner.min_block_interval as u64;
394 assert!(pruner.is_pruning_needed(third_block_number));
395
396 finished_exex_height_tx.send(FinishedExExHeight::NotReady).unwrap();
398 assert!(!pruner.is_pruning_needed(third_block_number));
399
400 finished_exex_height_tx.send(FinishedExExHeight::Height(second_block_number)).unwrap();
402 assert!(!pruner.is_pruning_needed(third_block_number));
403
404 finished_exex_height_tx.send(FinishedExExHeight::Height(third_block_number)).unwrap();
406 assert!(pruner.is_pruning_needed(third_block_number));
407 }
408}