1mod manager;
2pub use manager::{StaticFileAccess, StaticFileProvider, StaticFileWriter};
3
4mod jar;
5pub use jar::StaticFileJarProvider;
6
7mod writer;
8pub use writer::{StaticFileProviderRW, StaticFileProviderRWRefMut};
9
10mod metrics;
11use reth_nippy_jar::NippyJar;
12use reth_static_file_types::{SegmentHeader, StaticFileSegment};
13use reth_storage_errors::provider::{ProviderError, ProviderResult};
14use std::{ops::Deref, sync::Arc};
15
16type LoadedJarRef<'a> = dashmap::mapref::one::Ref<'a, (u64, StaticFileSegment), LoadedJar>;
18
19#[derive(Debug)]
21pub struct LoadedJar {
22 jar: NippyJar<SegmentHeader>,
23 mmap_handle: Arc<reth_nippy_jar::DataReader>,
24}
25
26impl LoadedJar {
27 fn new(jar: NippyJar<SegmentHeader>) -> ProviderResult<Self> {
28 match jar.open_data_reader() {
29 Ok(data_reader) => {
30 let mmap_handle = Arc::new(data_reader);
31 Ok(Self { jar, mmap_handle })
32 }
33 Err(e) => Err(ProviderError::other(e)),
34 }
35 }
36
37 fn mmap_handle(&self) -> Arc<reth_nippy_jar::DataReader> {
39 self.mmap_handle.clone()
40 }
41
42 const fn segment(&self) -> StaticFileSegment {
43 self.jar.user_header().segment()
44 }
45}
46
47impl Deref for LoadedJar {
48 type Target = NippyJar<SegmentHeader>;
49 fn deref(&self) -> &Self::Target {
50 &self.jar
51 }
52}
53
54#[cfg(test)]
55mod tests {
56 use super::*;
57 use crate::{
58 test_utils::create_test_provider_factory, HeaderProvider, StaticFileProviderFactory,
59 };
60 use alloy_consensus::{Header, SignableTransaction, Transaction, TxLegacy};
61 use alloy_primitives::{BlockHash, Signature, TxNumber, B256};
62 use rand::seq::SliceRandom;
63 use reth_db::test_utils::create_test_static_files_dir;
64 use reth_db_api::{transaction::DbTxMut, CanonicalHeaders, HeaderNumbers, Headers};
65 use reth_ethereum_primitives::{EthPrimitives, Receipt, TransactionSigned};
66 use reth_static_file_types::{
67 find_fixed_range, SegmentRangeInclusive, DEFAULT_BLOCKS_PER_STATIC_FILE,
68 };
69 use reth_storage_api::{ReceiptProvider, TransactionsProvider};
70 use reth_testing_utils::generators::{self, random_header_range};
71 use std::{fmt::Debug, fs, ops::Range, path::Path};
72
73 fn assert_eyre<T: PartialEq + Debug>(got: T, expected: T, msg: &str) -> eyre::Result<()> {
74 if got != expected {
75 eyre::bail!("{msg} | got: {got:?} expected: {expected:?}");
76 }
77 Ok(())
78 }
79
80 #[test]
81 fn test_snap() {
82 let row_count = 100u64;
84 let range = 0..=(row_count - 1);
85
86 let factory = create_test_provider_factory();
88 let static_files_path = tempfile::tempdir().unwrap();
89 let static_file = static_files_path.path().join(
90 StaticFileSegment::Headers
91 .filename(&find_fixed_range(*range.end(), DEFAULT_BLOCKS_PER_STATIC_FILE)),
92 );
93
94 let mut headers = random_header_range(
96 &mut generators::rng(),
97 *range.start()..(*range.end() + 1),
98 B256::random(),
99 );
100
101 let mut provider_rw = factory.provider_rw().unwrap();
102 let tx = provider_rw.tx_mut();
103 for header in headers.clone() {
104 let hash = header.hash();
105
106 tx.put::<CanonicalHeaders>(header.number, hash).unwrap();
107 tx.put::<Headers>(header.number, header.clone_header()).unwrap();
108 tx.put::<HeaderNumbers>(hash, header.number).unwrap();
109 }
110 provider_rw.commit().unwrap();
111
112 {
114 let manager = factory.static_file_provider();
115 let mut writer = manager.latest_writer(StaticFileSegment::Headers).unwrap();
116
117 for header in headers.clone() {
118 let hash = header.hash();
119 writer.append_header(&header.unseal(), &hash).unwrap();
120 }
121 writer.commit().unwrap();
122 }
123
124 {
126 let db_provider = factory.provider().unwrap();
127 let manager = db_provider.static_file_provider();
128 let jar_provider = manager
129 .get_segment_provider_from_block(StaticFileSegment::Headers, 0, Some(&static_file))
130 .unwrap();
131
132 assert!(!headers.is_empty());
133
134 headers.shuffle(&mut generators::rng());
136
137 for header in headers {
138 let header_hash = header.hash();
139 let header = header.unseal();
140
141 assert_eq!(header, db_provider.header(header_hash).unwrap().unwrap());
143 assert_eq!(header, jar_provider.header_by_number(header.number).unwrap().unwrap());
144 }
145 }
146 }
147
148 #[test]
149 fn test_header_truncation() {
150 let (static_dir, _) = create_test_static_files_dir();
151
152 let blocks_per_file = 10; let files_per_range = 3; let file_set_count = 3; let initial_file_count = files_per_range * file_set_count;
156 let tip = blocks_per_file * file_set_count - 1; {
160 let sf_rw = StaticFileProvider::<EthPrimitives>::read_write(&static_dir)
161 .expect("Failed to create static file provider")
162 .with_custom_blocks_per_file(blocks_per_file);
163
164 let mut header_writer = sf_rw.latest_writer(StaticFileSegment::Headers).unwrap();
165
166 let mut header = Header::default();
168 for num in 0..=tip {
169 header.number = num;
170 header_writer.append_header(&header, &BlockHash::default()).unwrap();
171 }
172 header_writer.commit().unwrap();
173 }
174
175 fn prune_and_validate(
177 writer: &mut StaticFileProviderRWRefMut<'_, EthPrimitives>,
178 sf_rw: &StaticFileProvider<EthPrimitives>,
179 static_dir: impl AsRef<Path>,
180 prune_count: u64,
181 expected_tip: Option<u64>,
182 expected_file_count: u64,
183 ) -> eyre::Result<()> {
184 writer.prune_headers(prune_count)?;
185 writer.commit()?;
186
187 assert_eyre(
189 sf_rw.get_highest_static_file_block(StaticFileSegment::Headers),
190 expected_tip,
191 "block mismatch",
192 )?;
193
194 if let Some(id) = expected_tip {
195 assert_eyre(
196 sf_rw.header_by_number(id)?.map(|h| h.number),
197 expected_tip,
198 "header mismatch",
199 )?;
200 }
201
202 assert_eyre(
204 count_files_without_lockfile(static_dir)?,
205 expected_file_count as usize,
206 "file count mismatch",
207 )?;
208
209 Ok(())
210 }
211
212 type PruneCount = u64;
214 type ExpectedTip = u64;
215 type ExpectedFileCount = u64;
216 let mut tmp_tip = tip;
217 let test_cases: Vec<(PruneCount, Option<ExpectedTip>, ExpectedFileCount)> = vec![
218 {
220 tmp_tip -= 1;
221 (1, Some(tmp_tip), initial_file_count)
222 },
223 {
225 tmp_tip -= blocks_per_file - 1;
226 (blocks_per_file - 1, Some(tmp_tip), initial_file_count - files_per_range)
227 },
228 {
231 tmp_tip -= blocks_per_file + 1;
232 (blocks_per_file + 1, Some(tmp_tip), initial_file_count - files_per_range * 2)
233 },
234 {
236 (
237 tmp_tip,
238 Some(0), files_per_range, )
241 },
242 {
244 (
245 1,
246 None, files_per_range, )
249 },
250 ];
251
252 {
254 let sf_rw = StaticFileProvider::read_write(&static_dir)
255 .expect("Failed to create static file provider")
256 .with_custom_blocks_per_file(blocks_per_file);
257
258 assert_eq!(sf_rw.get_highest_static_file_block(StaticFileSegment::Headers), Some(tip));
259 assert_eq!(
260 count_files_without_lockfile(static_dir.as_ref()).unwrap(),
261 initial_file_count as usize
262 );
263
264 let mut header_writer = sf_rw.latest_writer(StaticFileSegment::Headers).unwrap();
265
266 for (case, (prune_count, expected_tip, expected_file_count)) in
267 test_cases.into_iter().enumerate()
268 {
269 prune_and_validate(
270 &mut header_writer,
271 &sf_rw,
272 &static_dir,
273 prune_count,
274 expected_tip,
275 expected_file_count,
276 )
277 .map_err(|err| eyre::eyre!("Test case {case}: {err}"))
278 .unwrap();
279 }
280 }
281 }
282
283 fn setup_tx_based_scenario(
290 sf_rw: &StaticFileProvider<EthPrimitives>,
291 segment: StaticFileSegment,
292 blocks_per_file: u64,
293 ) {
294 fn setup_block_ranges(
295 writer: &mut StaticFileProviderRWRefMut<'_, EthPrimitives>,
296 sf_rw: &StaticFileProvider<EthPrimitives>,
297 segment: StaticFileSegment,
298 block_range: &Range<u64>,
299 mut tx_count: u64,
300 next_tx_num: &mut u64,
301 ) {
302 let mut receipt = Receipt::default();
303 let mut tx = TxLegacy::default();
304
305 for block in block_range.clone() {
306 writer.increment_block(block).unwrap();
307
308 if tx_count > 0 {
310 if segment.is_receipts() {
311 receipt.cumulative_gas_used = *next_tx_num;
313 writer.append_receipt(*next_tx_num, &receipt).unwrap();
314 } else {
315 tx.nonce = *next_tx_num;
317 let tx: TransactionSigned =
318 tx.clone().into_signed(Signature::test_signature()).into();
319 writer.append_transaction(*next_tx_num, &tx).unwrap();
320 }
321 *next_tx_num += 1;
322 tx_count -= 1;
323 }
324 }
325 writer.commit().unwrap();
326
327 let expected_block = block_range.end - 1;
329 let expected_tx = if tx_count == 0 { *next_tx_num - 1 } else { *next_tx_num };
330
331 assert_eq!(sf_rw.get_highest_static_file_block(segment), Some(expected_block),);
333 assert_eq!(sf_rw.get_highest_static_file_tx(segment), Some(expected_tx),);
334 }
335
336 let block_ranges = [
338 0..blocks_per_file,
339 blocks_per_file..blocks_per_file * 2,
340 blocks_per_file * 2..blocks_per_file * 3,
341 ];
342
343 let tx_counts = [
344 blocks_per_file - 1, 0, 1, ];
348
349 let mut writer = sf_rw.latest_writer(segment).unwrap();
350 let mut next_tx_num = 0;
351
352 for (block_range, tx_count) in block_ranges.iter().zip(tx_counts.iter()) {
354 setup_block_ranges(
355 &mut writer,
356 sf_rw,
357 segment,
358 block_range,
359 *tx_count,
360 &mut next_tx_num,
361 );
362 }
363
364 let expected_tx_ranges = vec![
366 Some(SegmentRangeInclusive::new(0, 8)),
367 None,
368 Some(SegmentRangeInclusive::new(9, 9)),
369 ];
370
371 block_ranges.iter().zip(expected_tx_ranges).for_each(|(block_range, expected_tx_range)| {
372 assert_eq!(
373 sf_rw
374 .get_segment_provider_from_block(segment, block_range.start, None)
375 .unwrap()
376 .user_header()
377 .tx_range(),
378 expected_tx_range.as_ref()
379 );
380 });
381
382 let tx_index = sf_rw.tx_index().read();
384 let expected_tx_index =
385 vec![(8, SegmentRangeInclusive::new(0, 9)), (9, SegmentRangeInclusive::new(20, 29))];
386 assert_eq!(
387 tx_index.get(&segment).map(|index| index.iter().map(|(k, v)| (*k, *v)).collect()),
388 (!expected_tx_index.is_empty()).then_some(expected_tx_index),
389 "tx index mismatch",
390 );
391 }
392
393 #[test]
394 fn test_tx_based_truncation() {
395 let segments = [StaticFileSegment::Transactions, StaticFileSegment::Receipts];
396 let blocks_per_file = 10; let files_per_range = 3; let file_set_count = 3; let initial_file_count = files_per_range * file_set_count;
400
401 #[expect(clippy::too_many_arguments)]
402 fn prune_and_validate(
403 sf_rw: &StaticFileProvider<EthPrimitives>,
404 static_dir: impl AsRef<Path>,
405 segment: StaticFileSegment,
406 prune_count: u64,
407 last_block: u64,
408 expected_tx_tip: Option<u64>,
409 expected_file_count: i32,
410 expected_tx_index: Vec<(TxNumber, SegmentRangeInclusive)>,
411 ) -> eyre::Result<()> {
412 let mut writer = sf_rw.latest_writer(segment)?;
413
414 if segment.is_receipts() {
416 writer.prune_receipts(prune_count, last_block)?;
417 } else {
418 writer.prune_transactions(prune_count, last_block)?;
419 }
420 writer.commit()?;
421
422 assert_eyre(
424 sf_rw.get_highest_static_file_block(segment),
425 Some(last_block),
426 "block mismatch",
427 )?;
428 assert_eyre(sf_rw.get_highest_static_file_tx(segment), expected_tx_tip, "tx mismatch")?;
429
430 if let Some(id) = expected_tx_tip {
433 if segment.is_receipts() {
434 assert_eyre(
435 expected_tx_tip,
436 sf_rw.receipt(id)?.map(|r| r.cumulative_gas_used),
437 "tx mismatch",
438 )?;
439 } else {
440 assert_eyre(
441 expected_tx_tip,
442 sf_rw.transaction_by_id(id)?.map(|t| t.nonce()),
443 "tx mismatch",
444 )?;
445 }
446 }
447
448 assert_eyre(
450 count_files_without_lockfile(static_dir)?,
451 expected_file_count as usize,
452 "file count mismatch",
453 )?;
454
455 let tx_index = sf_rw.tx_index().read();
457 assert_eyre(
458 tx_index.get(&segment).map(|index| index.iter().map(|(k, v)| (*k, *v)).collect()),
459 (!expected_tx_index.is_empty()).then_some(expected_tx_index),
460 "tx index mismatch",
461 )?;
462
463 Ok(())
464 }
465
466 for segment in segments {
467 let (static_dir, _) = create_test_static_files_dir();
468
469 let sf_rw = StaticFileProvider::read_write(&static_dir)
470 .expect("Failed to create static file provider")
471 .with_custom_blocks_per_file(blocks_per_file);
472
473 setup_tx_based_scenario(&sf_rw, segment, blocks_per_file);
474
475 let sf_rw = StaticFileProvider::read_write(&static_dir)
476 .expect("Failed to create static file provider")
477 .with_custom_blocks_per_file(blocks_per_file);
478 let highest_tx = sf_rw.get_highest_static_file_tx(segment).unwrap();
479
480 let test_cases = vec![
483 (
488 1,
489 blocks_per_file * 2,
490 Some(highest_tx - 1),
491 initial_file_count,
492 vec![(highest_tx - 1, SegmentRangeInclusive::new(0, 9))],
493 ),
494 (
498 0,
499 blocks_per_file - 1,
500 Some(highest_tx - 1),
501 files_per_range,
502 vec![(highest_tx - 1, SegmentRangeInclusive::new(0, 9))],
503 ),
504 (
506 highest_tx - 1,
507 1,
508 Some(0),
509 files_per_range,
510 vec![(0, SegmentRangeInclusive::new(0, 1))],
511 ),
512 (1, 0, None, files_per_range, vec![]),
514 ];
515
516 for (
518 case,
519 (prune_count, last_block, expected_tx_tip, expected_file_count, expected_tx_index),
520 ) in test_cases.into_iter().enumerate()
521 {
522 prune_and_validate(
523 &sf_rw,
524 &static_dir,
525 segment,
526 prune_count,
527 last_block,
528 expected_tx_tip,
529 expected_file_count,
530 expected_tx_index,
531 )
532 .map_err(|err| eyre::eyre!("Test case {case}: {err}"))
533 .unwrap();
534 }
535 }
536 }
537
538 fn count_files_without_lockfile(path: impl AsRef<Path>) -> eyre::Result<usize> {
540 let is_lockfile = |entry: &fs::DirEntry| {
541 entry.path().file_name().map(|name| name == "lock").unwrap_or(false)
542 };
543 let count = fs::read_dir(path)?
544 .filter_map(|entry| entry.ok())
545 .filter(|entry| !is_lockfile(entry))
546 .count();
547
548 Ok(count)
549 }
550}