1mod receipts;
2mod set;
3mod user;
4
5use crate::{PruneLimiter, PrunerError};
6use alloy_primitives::{BlockNumber, TxNumber};
7use reth_provider::{
8 errors::provider::ProviderResult, BlockReader, PruneCheckpointWriter, StaticFileProviderFactory,
9};
10use reth_prune_types::{
11 PruneCheckpoint, PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutput,
12 SegmentOutputCheckpoint,
13};
14use reth_stages_types::StageId;
15use reth_static_file_types::StaticFileSegment;
16pub use set::SegmentSet;
17use std::{fmt::Debug, ops::RangeInclusive};
18use tracing::error;
19pub use user::{
20 AccountHistory, Bodies, Receipts as UserReceipts, ReceiptsByLogs, SenderRecovery,
21 StorageHistory, TransactionLookup,
22};
23
24pub(crate) fn prune_static_files<Provider>(
33 provider: &Provider,
34 input: PruneInput,
35 segment: StaticFileSegment,
36) -> Result<SegmentOutput, PrunerError>
37where
38 Provider: StaticFileProviderFactory,
39{
40 let deleted_headers =
41 provider.static_file_provider().delete_segment_below_block(segment, input.to_block + 1)?;
42
43 if deleted_headers.is_empty() {
44 return Ok(SegmentOutput {
45 progress: PruneProgress::Finished,
46 pruned: 0,
47 checkpoint: input
48 .previous_checkpoint
49 .map(SegmentOutputCheckpoint::from_prune_checkpoint),
50 })
51 }
52
53 let tx_ranges = deleted_headers.iter().filter_map(|header| header.tx_range());
54
55 let pruned = tx_ranges.clone().map(|range| range.len()).sum::<u64>() as usize;
56
57 let checkpoint_block = deleted_headers
59 .iter()
60 .filter_map(|header| header.block_range())
61 .map(|range| range.end())
62 .max();
63
64 Ok(SegmentOutput {
65 progress: PruneProgress::Finished,
66 pruned,
67 checkpoint: Some(SegmentOutputCheckpoint {
68 block_number: checkpoint_block,
69 tx_number: tx_ranges.map(|range| range.end()).max(),
70 }),
71 })
72}
73
74pub(crate) fn delete_static_files_segment<Provider>(
79 provider: &Provider,
80 input: PruneInput,
81 segment: StaticFileSegment,
82) -> Result<SegmentOutput, PrunerError>
83where
84 Provider: StaticFileProviderFactory,
85{
86 let deleted_headers = provider.static_file_provider().delete_segment(segment)?;
87
88 if deleted_headers.is_empty() {
89 return Ok(SegmentOutput::done())
90 }
91
92 let tx_ranges = deleted_headers.iter().filter_map(|header| header.tx_range());
93
94 let pruned = tx_ranges.clone().map(|range| range.len()).sum::<u64>() as usize;
95
96 Ok(SegmentOutput {
97 progress: PruneProgress::Finished,
98 pruned,
99 checkpoint: Some(SegmentOutputCheckpoint {
100 block_number: Some(input.to_block),
101 tx_number: tx_ranges.map(|range| range.end()).max(),
102 }),
103 })
104}
105
106pub trait Segment<Provider>: Debug + Send + Sync {
114 fn segment(&self) -> PruneSegment;
116
117 fn mode(&self) -> Option<PruneMode>;
119
120 fn purpose(&self) -> PrunePurpose;
122
123 fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError>;
125
126 fn save_checkpoint(
128 &self,
129 provider: &Provider,
130 checkpoint: PruneCheckpoint,
131 ) -> ProviderResult<()>
132 where
133 Provider: PruneCheckpointWriter,
134 {
135 provider.save_prune_checkpoint(self.segment(), checkpoint)
136 }
137
138 fn required_stage(&self) -> Option<StageId> {
143 None
144 }
145}
146
147#[derive(Debug)]
149#[cfg_attr(test, derive(Clone))]
150pub struct PruneInput {
151 pub(crate) previous_checkpoint: Option<PruneCheckpoint>,
152 pub(crate) to_block: BlockNumber,
154 pub(crate) limiter: PruneLimiter,
156}
157
158impl PruneInput {
159 pub(crate) fn get_next_tx_num_range<Provider: BlockReader>(
168 &self,
169 provider: &Provider,
170 ) -> ProviderResult<Option<RangeInclusive<TxNumber>>> {
171 let from_tx_number = self.previous_checkpoint
172 .and_then(|checkpoint| match checkpoint.tx_number {
174 Some(tx_number) => Some(tx_number + 1),
175 _ => {
176 error!(target: "pruner", ?checkpoint, "Expected transaction number in prune checkpoint, found None");
177 None
178 },
179 })
180 .unwrap_or_default();
182
183 let to_tx_number = match provider.block_body_indices(self.to_block)? {
184 Some(body) => {
185 let last_tx = body.last_tx_num();
186 if last_tx + body.tx_count() == 0 {
187 return Ok(None)
191 }
192 last_tx
193 }
194 None => return Ok(None),
195 };
196
197 let range = from_tx_number..=to_tx_number;
198 if range.is_empty() {
199 return Ok(None)
200 }
201
202 Ok(Some(range))
203 }
204
205 pub(crate) fn get_next_block_range(&self) -> Option<RangeInclusive<BlockNumber>> {
214 let from_block = self.get_start_next_block_range();
215 let range = from_block..=self.to_block;
216 if range.is_empty() {
217 return None
218 }
219
220 Some(range)
221 }
222
223 pub(crate) fn get_start_next_block_range(&self) -> u64 {
228 self.previous_checkpoint
229 .and_then(|checkpoint| checkpoint.block_number)
230 .map(|block_number| block_number + 1)
232 .unwrap_or(0)
234 }
235}
236
237#[cfg(test)]
238mod tests {
239 use super::*;
240 use alloy_primitives::B256;
241 use reth_provider::{
242 providers::BlockchainProvider,
243 test_utils::{create_test_provider_factory, MockEthProvider},
244 BlockWriter,
245 };
246 use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
247
248 #[test]
249 fn test_prune_input_get_next_tx_num_range_no_to_block() {
250 let input = PruneInput {
251 previous_checkpoint: None,
252 to_block: 10,
253 limiter: PruneLimiter::default(),
254 };
255
256 let provider = MockEthProvider::default();
258
259 let range = input.get_next_tx_num_range(&provider).expect("Expected range");
261 assert!(range.is_none());
262 }
263
264 #[test]
265 fn test_prune_input_get_next_tx_num_range_no_tx() {
266 let input = PruneInput {
267 previous_checkpoint: None,
268 to_block: 10,
269 limiter: PruneLimiter::default(),
270 };
271
272 let mut rng = generators::rng();
273 let factory = create_test_provider_factory();
274
275 let blocks = random_block_range(
277 &mut rng,
278 0..=10,
279 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
280 );
281
282 let provider_rw = factory.provider_rw().expect("failed to get provider_rw");
284 for block in &blocks {
285 provider_rw
286 .insert_block(
287 &block.clone().try_recover().expect("failed to seal block with senders"),
288 )
289 .expect("failed to insert block");
290 }
291 provider_rw.commit().expect("failed to commit");
292
293 let provider = BlockchainProvider::new(factory).unwrap();
295
296 let range = input.get_next_tx_num_range(&provider).expect("Expected range");
298 assert!(range.is_none());
299 }
300
301 #[test]
302 fn test_prune_input_get_next_tx_num_range_valid() {
303 let input = PruneInput {
305 previous_checkpoint: None,
306 to_block: 10,
307 limiter: PruneLimiter::default(),
308 };
309
310 let mut rng = generators::rng();
311 let factory = create_test_provider_factory();
312
313 let blocks = random_block_range(
315 &mut rng,
316 0..=10,
317 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..5, ..Default::default() },
318 );
319
320 let provider_rw = factory.provider_rw().expect("failed to get provider_rw");
322 for block in &blocks {
323 provider_rw
324 .insert_block(
325 &block.clone().try_recover().expect("failed to seal block with senders"),
326 )
327 .expect("failed to insert block");
328 }
329 provider_rw.commit().expect("failed to commit");
330
331 let provider = BlockchainProvider::new(factory).unwrap();
333
334 let range = input.get_next_tx_num_range(&provider).expect("Expected range").unwrap();
336
337 let num_txs = blocks.iter().map(|block| block.transaction_count() as u64).sum::<u64>();
339
340 assert_eq!(range, 0..=num_txs - 1);
341 }
342
343 #[test]
344 fn test_prune_input_get_next_tx_checkpoint_without_tx_number() {
345 let input = PruneInput {
347 previous_checkpoint: Some(PruneCheckpoint {
348 block_number: Some(5),
349 tx_number: None,
350 prune_mode: PruneMode::Full,
351 }),
352 to_block: 10,
353 limiter: PruneLimiter::default(),
354 };
355
356 let mut rng = generators::rng();
357 let factory = create_test_provider_factory();
358
359 let blocks = random_block_range(
361 &mut rng,
362 0..=10,
363 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..5, ..Default::default() },
364 );
365
366 let provider_rw = factory.provider_rw().expect("failed to get provider_rw");
368 for block in &blocks {
369 provider_rw
370 .insert_block(
371 &block.clone().try_recover().expect("failed to seal block with senders"),
372 )
373 .expect("failed to insert block");
374 }
375 provider_rw.commit().expect("failed to commit");
376
377 let provider = BlockchainProvider::new(factory).unwrap();
379
380 let range = input.get_next_tx_num_range(&provider).expect("Expected range").unwrap();
382
383 let num_txs = blocks.iter().map(|block| block.transaction_count() as u64).sum::<u64>();
385
386 assert_eq!(range, 0..=num_txs - 1,);
387 }
388
389 #[test]
390 fn test_prune_input_get_next_tx_empty_range() {
391 let mut rng = generators::rng();
393 let factory = create_test_provider_factory();
394
395 let blocks = random_block_range(
397 &mut rng,
398 0..=10,
399 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..5, ..Default::default() },
400 );
401
402 let provider_rw = factory.provider_rw().expect("failed to get provider_rw");
404 for block in &blocks {
405 provider_rw
406 .insert_block(
407 &block.clone().try_recover().expect("failed to seal block with senders"),
408 )
409 .expect("failed to insert block");
410 }
411 provider_rw.commit().expect("failed to commit");
412
413 let provider = BlockchainProvider::new(factory).unwrap();
415
416 let num_txs = blocks.iter().map(|block| block.transaction_count() as u64).sum::<u64>();
419 let max_range = num_txs - 1;
420
421 let input = PruneInput {
423 previous_checkpoint: Some(PruneCheckpoint {
424 block_number: Some(5),
425 tx_number: Some(max_range),
426 prune_mode: PruneMode::Full,
427 }),
428 to_block: 10,
429 limiter: PruneLimiter::default(),
430 };
431
432 let range = input.get_next_tx_num_range(&provider).expect("Expected range");
434 assert!(range.is_none());
435 }
436}