reth_prune/segments/user/
bodies.rs1use crate::{
2 segments::{self, PruneInput, Segment},
3 PrunerError,
4};
5use reth_provider::{BlockReader, StaticFileProviderFactory};
6use reth_prune_types::{PruneMode, PrunePurpose, PruneSegment, SegmentOutput};
7use reth_static_file_types::StaticFileSegment;
8
9#[derive(Debug)]
13pub struct Bodies {
14 mode: PruneMode,
15}
16
17impl Bodies {
18 pub const fn new(mode: PruneMode) -> Self {
20 Self { mode }
21 }
22}
23
24impl<Provider> Segment<Provider> for Bodies
25where
26 Provider: StaticFileProviderFactory + BlockReader,
27{
28 fn segment(&self) -> PruneSegment {
29 PruneSegment::Bodies
30 }
31
32 fn mode(&self) -> Option<PruneMode> {
33 Some(self.mode)
34 }
35
36 fn purpose(&self) -> PrunePurpose {
37 PrunePurpose::User
38 }
39
40 fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
41 segments::prune_static_files(provider, input, StaticFileSegment::Transactions)
42 }
43}
44
45#[cfg(test)]
46mod tests {
47 use super::*;
48 use crate::Pruner;
49 use alloy_primitives::BlockNumber;
50 use reth_exex_types::FinishedExExHeight;
51 use reth_provider::{
52 test_utils::{create_test_provider_factory, MockNodeTypesWithDB},
53 ProviderFactory, StaticFileWriter,
54 };
55 use reth_prune_types::{PruneMode, PruneProgress, PruneSegment};
56 use reth_static_file_types::{
57 SegmentHeader, SegmentRangeInclusive, StaticFileSegment, DEFAULT_BLOCKS_PER_STATIC_FILE,
58 };
59
60 fn setup_static_file_jars<P: StaticFileProviderFactory>(provider: &P, tip_block: u64) {
64 let num_jars = (tip_block + 1) / DEFAULT_BLOCKS_PER_STATIC_FILE;
65 let txs_per_jar = 1000;
66 let static_file_provider = provider.static_file_provider();
67
68 let mut writer =
69 static_file_provider.latest_writer(StaticFileSegment::Transactions).unwrap();
70
71 for jar_idx in 0..num_jars {
72 let block_start = jar_idx * DEFAULT_BLOCKS_PER_STATIC_FILE;
73 let block_end = ((jar_idx + 1) * DEFAULT_BLOCKS_PER_STATIC_FILE - 1).min(tip_block);
74
75 let tx_start = jar_idx * txs_per_jar;
76 let tx_end = tx_start + txs_per_jar - 1;
77
78 *writer.user_header_mut() = SegmentHeader::new(
79 SegmentRangeInclusive::new(block_start, block_end),
80 Some(SegmentRangeInclusive::new(block_start, block_end)),
81 Some(SegmentRangeInclusive::new(tx_start, tx_end)),
82 StaticFileSegment::Transactions,
83 );
84
85 writer.inner().set_dirty();
86 writer.commit().expect("commit empty jar");
87
88 if jar_idx < num_jars - 1 {
89 writer.increment_block(block_end + 1).expect("increment block");
90 }
91 }
92
93 static_file_provider.initialize_index().expect("initialize index");
94 }
95
96 struct PruneTestCase {
97 prune_mode: PruneMode,
98 expected_pruned: usize,
99 expected_lowest_block: Option<BlockNumber>,
100 }
101
102 fn run_prune_test(
103 factory: &ProviderFactory<MockNodeTypesWithDB>,
104 finished_exex_height_rx: &tokio::sync::watch::Receiver<FinishedExExHeight>,
105 test_case: PruneTestCase,
106 tip: BlockNumber,
107 ) {
108 let bodies = Bodies::new(test_case.prune_mode);
109 let segments: Vec<Box<dyn Segment<_>>> = vec![Box::new(bodies)];
110
111 let mut pruner = Pruner::new_with_factory(
112 factory.clone(),
113 segments,
114 5,
115 10000,
116 None,
117 finished_exex_height_rx.clone(),
118 );
119
120 let result = pruner.run(tip).expect("pruner run");
121
122 assert_eq!(result.progress, PruneProgress::Finished);
123 assert_eq!(result.segments.len(), 1);
124
125 let (segment, output) = &result.segments[0];
126 assert_eq!(*segment, PruneSegment::Bodies);
127 assert_eq!(output.pruned, test_case.expected_pruned);
128
129 let static_provider = factory.static_file_provider();
130 assert_eq!(
131 static_provider.get_lowest_range_end(StaticFileSegment::Transactions),
132 test_case.expected_lowest_block
133 );
134 assert_eq!(
135 static_provider.get_highest_static_file_block(StaticFileSegment::Transactions),
136 Some(tip)
137 );
138 }
139
140 #[test]
141 fn bodies_prune_through_pruner() {
142 let factory = create_test_provider_factory();
143 let tip = 2_499_999;
144 setup_static_file_jars(&factory, tip);
145
146 let (_, finished_exex_height_rx) = tokio::sync::watch::channel(FinishedExExHeight::NoExExs);
147
148 let test_cases = vec![
149 PruneTestCase {
151 prune_mode: PruneMode::Before(750_000),
152 expected_pruned: 1000,
153 expected_lowest_block: Some(999_999),
154 },
155 PruneTestCase {
158 prune_mode: PruneMode::Before(850_000),
159 expected_pruned: 0,
160 expected_lowest_block: Some(999_999),
161 },
162 PruneTestCase {
165 prune_mode: PruneMode::Before(1_599_999),
166 expected_pruned: 2000,
167 expected_lowest_block: Some(1_999_999),
168 },
169 PruneTestCase {
172 prune_mode: PruneMode::Distance(500_000),
173 expected_pruned: 1000,
174 expected_lowest_block: Some(2_499_999),
175 },
176 PruneTestCase {
179 prune_mode: PruneMode::Before(2_300_000),
180 expected_pruned: 0,
181 expected_lowest_block: Some(2_499_999),
182 },
183 ];
184
185 for test_case in test_cases {
186 run_prune_test(&factory, &finished_exex_height_rx, test_case, tip);
187 }
188 }
189
190 #[test]
191 fn min_block_updated_on_sync() {
192 struct MinBlockTestCase {
197 initial_range: Option<SegmentRangeInclusive>,
199 updated_range: SegmentRangeInclusive,
200 expected_before_update: Option<BlockNumber>,
202 expected_after_update: BlockNumber,
203 delete_below_block: BlockNumber,
205 expected_deleted: usize,
207 }
208
209 let test_cases = vec![
210 MinBlockTestCase {
212 initial_range: None,
213 updated_range: SegmentRangeInclusive::new(0, 100),
214 expected_before_update: None,
215 expected_after_update: 100,
216 delete_below_block: 1,
217 expected_deleted: 0,
218 },
219 MinBlockTestCase {
222 initial_range: Some(SegmentRangeInclusive::new(0, 0)),
223 updated_range: SegmentRangeInclusive::new(0, 100),
224 expected_before_update: Some(0),
225 expected_after_update: 100,
226 delete_below_block: 1,
227 expected_deleted: 0,
228 },
229 MinBlockTestCase {
231 initial_range: Some(SegmentRangeInclusive::new(0, 50)),
232 updated_range: SegmentRangeInclusive::new(0, 200),
233 expected_before_update: Some(50),
234 expected_after_update: 200,
235 delete_below_block: 150,
236 expected_deleted: 0,
237 },
238 ];
239
240 for (
241 idx,
242 MinBlockTestCase {
243 initial_range,
244 updated_range,
245 expected_before_update,
246 expected_after_update,
247 delete_below_block,
248 expected_deleted,
249 },
250 ) in test_cases.into_iter().enumerate()
251 {
252 let factory = create_test_provider_factory();
253 let static_provider = factory.static_file_provider();
254
255 let mut writer =
256 static_provider.latest_writer(StaticFileSegment::Transactions).unwrap();
257
258 if let Some(initial_range) = initial_range {
260 *writer.user_header_mut() = SegmentHeader::new(
261 static_provider
264 .find_fixed_range(StaticFileSegment::Transactions, initial_range.start()),
265 Some(initial_range),
266 Some(initial_range),
267 StaticFileSegment::Transactions,
268 );
269 writer.inner().set_dirty();
270 writer.commit().unwrap();
271 static_provider.initialize_index().unwrap();
272 }
273
274 assert_eq!(
276 static_provider.get_lowest_range_end(StaticFileSegment::Transactions),
277 expected_before_update,
278 "Test case {}: Initial min_block mismatch",
279 idx
280 );
281
282 writer.user_header_mut().set_block_range(updated_range.start(), updated_range.end());
284 writer.user_header_mut().set_tx_range(updated_range.start(), updated_range.end());
285 writer.inner().set_dirty();
286 writer.commit().unwrap(); assert_eq!(
290 static_provider.get_lowest_range_end(StaticFileSegment::Transactions),
291 Some(expected_after_update),
292 "Test case {}: min_block should be updated to {} (not stuck at stale value)",
293 idx,
294 expected_after_update
295 );
296
297 let deleted = static_provider
299 .delete_segment_below_block(StaticFileSegment::Transactions, delete_below_block)
300 .unwrap();
301
302 assert_eq!(deleted.len(), expected_deleted);
303 }
304 }
305}