1use crate::{
4 segments::{PruneInput, Segment},
5 Metrics, PruneLimiter, PrunerError, PrunerEvent,
6};
7use alloy_primitives::BlockNumber;
8use reth_exex_types::FinishedExExHeight;
9use reth_provider::{
10 DBProvider, DatabaseProviderFactory, PruneCheckpointReader, PruneCheckpointWriter,
11};
12use reth_prune_types::{PruneProgress, PrunedSegmentInfo, PrunerOutput};
13use reth_tokio_util::{EventSender, EventStream};
14use std::time::{Duration, Instant};
15use tokio::sync::watch;
16use tracing::debug;
17
18pub type PrunerResult = Result<PrunerOutput, PrunerError>;
20
21pub type PrunerWithResult<S, DB> = (Pruner<S, DB>, PrunerResult);
23
24pub type PrunerWithFactory<PF> = Pruner<<PF as DatabaseProviderFactory>::ProviderRW, PF>;
26
27#[derive(Debug)]
29pub struct Pruner<Provider, PF> {
30 provider_factory: PF,
32 segments: Vec<Box<dyn Segment<Provider>>>,
33 min_block_interval: usize,
36 previous_tip_block_number: Option<BlockNumber>,
40 delete_limit: usize,
42 timeout: Option<Duration>,
44 finished_exex_height: watch::Receiver<FinishedExExHeight>,
46 #[doc(hidden)]
47 metrics: Metrics,
48 event_sender: EventSender<PrunerEvent>,
49}
50
51impl<Provider> Pruner<Provider, ()> {
52 pub fn new(
54 segments: Vec<Box<dyn Segment<Provider>>>,
55 min_block_interval: usize,
56 delete_limit: usize,
57 timeout: Option<Duration>,
58 finished_exex_height: watch::Receiver<FinishedExExHeight>,
59 ) -> Self {
60 Self {
61 provider_factory: (),
62 segments,
63 min_block_interval,
64 previous_tip_block_number: None,
65 delete_limit,
66 timeout,
67 finished_exex_height,
68 metrics: Metrics::default(),
69 event_sender: Default::default(),
70 }
71 }
72}
73
74impl<PF> Pruner<PF::ProviderRW, PF>
75where
76 PF: DatabaseProviderFactory,
77{
78 pub fn new_with_factory(
80 provider_factory: PF,
81 segments: Vec<Box<dyn Segment<PF::ProviderRW>>>,
82 min_block_interval: usize,
83 delete_limit: usize,
84 timeout: Option<Duration>,
85 finished_exex_height: watch::Receiver<FinishedExExHeight>,
86 ) -> Self {
87 Self {
88 provider_factory,
89 segments,
90 min_block_interval,
91 previous_tip_block_number: None,
92 delete_limit,
93 timeout,
94 finished_exex_height,
95 metrics: Metrics::default(),
96 event_sender: Default::default(),
97 }
98 }
99}
100
101impl<Provider, S> Pruner<Provider, S>
102where
103 Provider: PruneCheckpointReader + PruneCheckpointWriter,
104{
105 pub fn events(&self) -> EventStream<PrunerEvent> {
107 self.event_sender.new_listener()
108 }
109
110 pub fn run_with_provider(
116 &mut self,
117 provider: &Provider,
118 tip_block_number: BlockNumber,
119 ) -> PrunerResult {
120 let Some(tip_block_number) =
121 self.adjust_tip_block_number_to_finished_exex_height(tip_block_number)
122 else {
123 return Ok(PruneProgress::Finished.into())
124 };
125 if tip_block_number == 0 {
126 self.previous_tip_block_number = Some(tip_block_number);
127
128 debug!(target: "pruner", %tip_block_number, "Nothing to prune yet");
129 return Ok(PruneProgress::Finished.into())
130 }
131
132 self.event_sender.notify(PrunerEvent::Started { tip_block_number });
133
134 debug!(target: "pruner", %tip_block_number, "Pruner started");
135 let start = Instant::now();
136
137 let mut limiter = PruneLimiter::default().set_deleted_entries_limit(self.delete_limit);
138 if let Some(timeout) = self.timeout {
139 limiter = limiter.set_time_limit(timeout);
140 };
141
142 let (stats, deleted_entries, output) =
143 self.prune_segments(provider, tip_block_number, &mut limiter)?;
144
145 self.previous_tip_block_number = Some(tip_block_number);
146
147 let elapsed = start.elapsed();
148 self.metrics.duration_seconds.record(elapsed);
149
150 let message = match output.progress {
151 PruneProgress::HasMoreData(_) => "Pruner interrupted and has more data to prune",
152 PruneProgress::Finished => "Pruner finished",
153 };
154
155 debug!(
156 target: "pruner",
157 %tip_block_number,
158 ?elapsed,
159 ?deleted_entries,
160 ?limiter,
161 ?output,
162 ?stats,
163 "{message}",
164 );
165
166 self.event_sender.notify(PrunerEvent::Finished { tip_block_number, elapsed, stats });
167
168 Ok(output)
169 }
170
171 fn prune_segments(
178 &mut self,
179 provider: &Provider,
180 tip_block_number: BlockNumber,
181 limiter: &mut PruneLimiter,
182 ) -> Result<(Vec<PrunedSegmentInfo>, usize, PrunerOutput), PrunerError> {
183 let mut stats = Vec::with_capacity(self.segments.len());
184 let mut pruned = 0;
185 let mut output = PrunerOutput {
186 progress: PruneProgress::Finished,
187 segments: Vec::with_capacity(self.segments.len()),
188 };
189
190 for segment in &self.segments {
191 if limiter.is_limit_reached() {
192 break
193 }
194
195 if let Some((to_block, prune_mode)) = segment
196 .mode()
197 .map(|mode| {
198 mode.prune_target_block(tip_block_number, segment.segment(), segment.purpose())
199 })
200 .transpose()?
201 .flatten()
202 {
203 debug!(
204 target: "pruner",
205 segment = ?segment.segment(),
206 purpose = ?segment.purpose(),
207 %to_block,
208 ?prune_mode,
209 "Segment pruning started"
210 );
211
212 let segment_start = Instant::now();
213 let previous_checkpoint = provider.get_prune_checkpoint(segment.segment())?;
214 let segment_output = segment.prune(
215 provider,
216 PruneInput { previous_checkpoint, to_block, limiter: limiter.clone() },
217 )?;
218 if let Some(checkpoint) = segment_output.checkpoint {
219 segment
220 .save_checkpoint(provider, checkpoint.as_prune_checkpoint(prune_mode))?;
221 }
222 self.metrics
223 .get_prune_segment_metrics(segment.segment())
224 .duration_seconds
225 .record(segment_start.elapsed());
226 if let Some(highest_pruned_block) =
227 segment_output.checkpoint.and_then(|checkpoint| checkpoint.block_number)
228 {
229 self.metrics
230 .get_prune_segment_metrics(segment.segment())
231 .highest_pruned_block
232 .set(highest_pruned_block as f64);
233 }
234
235 output.progress = segment_output.progress;
236 output.segments.push((segment.segment(), segment_output));
237
238 debug!(
239 target: "pruner",
240 segment = ?segment.segment(),
241 purpose = ?segment.purpose(),
242 %to_block,
243 ?prune_mode,
244 %segment_output.pruned,
245 "Segment pruning finished"
246 );
247
248 if segment_output.pruned > 0 {
249 limiter.increment_deleted_entries_count_by(segment_output.pruned);
250 pruned += segment_output.pruned;
251 let info = PrunedSegmentInfo {
252 segment: segment.segment(),
253 pruned: segment_output.pruned,
254 progress: segment_output.progress,
255 };
256 stats.push(info);
257 }
258 } else {
259 debug!(target: "pruner", segment = ?segment.segment(), purpose = ?segment.purpose(), "Nothing to prune for the segment");
260 }
261 }
262
263 Ok((stats, pruned, output))
264 }
265
266 pub fn is_pruning_needed(&self, tip_block_number: BlockNumber) -> bool {
270 let Some(tip_block_number) =
271 self.adjust_tip_block_number_to_finished_exex_height(tip_block_number)
272 else {
273 return false
274 };
275
276 if tip_block_number.saturating_sub(self.previous_tip_block_number.unwrap_or_default()) >=
280 self.min_block_interval as u64
281 {
282 debug!(
283 target: "pruner",
284 previous_tip_block_number = ?self.previous_tip_block_number,
285 %tip_block_number,
286 "Minimum pruning interval reached"
287 );
288 true
289 } else {
290 false
291 }
292 }
293
294 fn adjust_tip_block_number_to_finished_exex_height(
302 &self,
303 tip_block_number: BlockNumber,
304 ) -> Option<BlockNumber> {
305 match *self.finished_exex_height.borrow() {
306 FinishedExExHeight::NoExExs => Some(tip_block_number),
307 FinishedExExHeight::NotReady => {
308 debug!(target: "pruner", %tip_block_number, "Not all ExExs have emitted a `FinishedHeight` event yet, can't prune");
309 None
310 }
311 FinishedExExHeight::Height(finished_exex_height) => {
312 debug!(target: "pruner", %tip_block_number, %finished_exex_height, "Adjusting tip block number to the finished ExEx height");
313 Some(finished_exex_height)
314 }
315 }
316 }
317}
318
319impl<PF> Pruner<PF::ProviderRW, PF>
320where
321 PF: DatabaseProviderFactory<ProviderRW: PruneCheckpointWriter + PruneCheckpointReader>,
322{
323 pub fn run(&mut self, tip_block_number: BlockNumber) -> PrunerResult {
329 let provider = self.provider_factory.database_provider_rw()?;
330 let result = self.run_with_provider(&provider, tip_block_number);
331 provider.commit()?;
332 result
333 }
334}
335
336#[cfg(test)]
337mod tests {
338 use crate::Pruner;
339 use reth_exex_types::FinishedExExHeight;
340 use reth_provider::test_utils::create_test_provider_factory;
341
342 #[test]
343 fn is_pruning_needed() {
344 let provider_factory = create_test_provider_factory();
345
346 let (finished_exex_height_tx, finished_exex_height_rx) =
347 tokio::sync::watch::channel(FinishedExExHeight::NoExExs);
348
349 let mut pruner =
350 Pruner::new_with_factory(provider_factory, vec![], 5, 0, None, finished_exex_height_rx);
351
352 let first_block_number = 1;
354 assert!(!pruner.is_pruning_needed(first_block_number));
355 pruner.previous_tip_block_number = Some(first_block_number);
356
357 let second_block_number = first_block_number + pruner.min_block_interval as u64;
359 assert!(pruner.is_pruning_needed(second_block_number));
360 pruner.previous_tip_block_number = Some(second_block_number);
361
362 assert!(!pruner.is_pruning_needed(second_block_number));
364
365 let third_block_number = second_block_number + pruner.min_block_interval as u64;
367 assert!(pruner.is_pruning_needed(third_block_number));
368
369 finished_exex_height_tx.send(FinishedExExHeight::NotReady).unwrap();
371 assert!(!pruner.is_pruning_needed(third_block_number));
372
373 finished_exex_height_tx.send(FinishedExExHeight::Height(second_block_number)).unwrap();
375 assert!(!pruner.is_pruning_needed(third_block_number));
376
377 finished_exex_height_tx.send(FinishedExExHeight::Height(third_block_number)).unwrap();
379 assert!(pruner.is_pruning_needed(third_block_number));
380 }
381}