1mod manager;
2pub use manager::{
3 StaticFileAccess, StaticFileProvider, StaticFileProviderBuilder, StaticFileWriter,
4};
5
6mod jar;
7pub use jar::StaticFileJarProvider;
8
9mod writer;
10pub use writer::{StaticFileProviderRW, StaticFileProviderRWRefMut};
11
12mod metrics;
13use reth_nippy_jar::NippyJar;
14use reth_static_file_types::{SegmentHeader, StaticFileSegment};
15use reth_storage_errors::provider::{ProviderError, ProviderResult};
16use std::{ops::Deref, sync::Arc};
17
18type LoadedJarRef<'a> = dashmap::mapref::one::Ref<'a, (u64, StaticFileSegment), LoadedJar>;
20
21#[derive(Debug)]
23pub struct LoadedJar {
24 jar: NippyJar<SegmentHeader>,
25 mmap_handle: Arc<reth_nippy_jar::DataReader>,
26}
27
28impl LoadedJar {
29 fn new(jar: NippyJar<SegmentHeader>) -> ProviderResult<Self> {
30 match jar.open_data_reader() {
31 Ok(data_reader) => {
32 let mmap_handle = Arc::new(data_reader);
33 Ok(Self { jar, mmap_handle })
34 }
35 Err(e) => Err(ProviderError::other(e)),
36 }
37 }
38
39 fn mmap_handle(&self) -> Arc<reth_nippy_jar::DataReader> {
41 self.mmap_handle.clone()
42 }
43
44 const fn segment(&self) -> StaticFileSegment {
45 self.jar.user_header().segment()
46 }
47}
48
49impl Deref for LoadedJar {
50 type Target = NippyJar<SegmentHeader>;
51 fn deref(&self) -> &Self::Target {
52 &self.jar
53 }
54}
55
56#[cfg(test)]
57mod tests {
58 use super::*;
59 use crate::{
60 providers::static_file::manager::StaticFileProviderBuilder,
61 test_utils::create_test_provider_factory, HeaderProvider, StaticFileProviderFactory,
62 };
63 use alloy_consensus::{Header, SignableTransaction, Transaction, TxLegacy};
64 use alloy_primitives::{Address, BlockHash, Signature, TxNumber, B256, U160};
65 use rand::seq::SliceRandom;
66 use reth_db::test_utils::create_test_static_files_dir;
67 use reth_db_api::{transaction::DbTxMut, CanonicalHeaders, HeaderNumbers, Headers};
68 use reth_ethereum_primitives::{EthPrimitives, Receipt, TransactionSigned};
69 use reth_static_file_types::{
70 find_fixed_range, SegmentRangeInclusive, DEFAULT_BLOCKS_PER_STATIC_FILE,
71 };
72 use reth_storage_api::{ReceiptProvider, TransactionsProvider};
73 use reth_testing_utils::generators::{self, random_header_range};
74 use std::{collections::BTreeMap, fmt::Debug, fs, ops::Range, path::Path};
75
76 fn assert_eyre<T: PartialEq + Debug>(got: T, expected: T, msg: &str) -> eyre::Result<()> {
77 if got != expected {
78 eyre::bail!("{msg} | got: {got:?} expected: {expected:?}");
79 }
80 Ok(())
81 }
82
83 #[test]
84 fn test_static_files() {
85 let row_count = 100u64;
87 let range = 0..=(row_count - 1);
88
89 let factory = create_test_provider_factory();
91 let static_files_path = tempfile::tempdir().unwrap();
92 let static_file = static_files_path.path().join(
93 StaticFileSegment::Headers
94 .filename(&find_fixed_range(*range.end(), DEFAULT_BLOCKS_PER_STATIC_FILE)),
95 );
96
97 let mut headers = random_header_range(
99 &mut generators::rng(),
100 *range.start()..(*range.end() + 1),
101 B256::random(),
102 );
103
104 let mut provider_rw = factory.provider_rw().unwrap();
105 let tx = provider_rw.tx_mut();
106 for header in headers.clone() {
107 let hash = header.hash();
108
109 tx.put::<CanonicalHeaders>(header.number, hash).unwrap();
110 tx.put::<Headers>(header.number, header.clone_header()).unwrap();
111 tx.put::<HeaderNumbers>(hash, header.number).unwrap();
112 }
113 provider_rw.commit().unwrap();
114
115 {
117 let manager = factory.static_file_provider();
118 let mut writer = manager.latest_writer(StaticFileSegment::Headers).unwrap();
119
120 for header in headers.clone() {
121 let hash = header.hash();
122 writer.append_header(&header.unseal(), &hash).unwrap();
123 }
124 writer.commit().unwrap();
125 }
126
127 {
129 let db_provider = factory.provider().unwrap();
130 let manager = db_provider.static_file_provider();
131 let jar_provider = manager
132 .get_segment_provider_for_block(StaticFileSegment::Headers, 0, Some(&static_file))
133 .unwrap();
134
135 assert!(!headers.is_empty());
136
137 headers.shuffle(&mut generators::rng());
139
140 for header in headers {
141 let header_hash = header.hash();
142 let header = header.unseal();
143
144 assert_eq!(header, db_provider.header(header_hash).unwrap().unwrap());
146 assert_eq!(header, jar_provider.header_by_number(header.number).unwrap().unwrap());
147 }
148 }
149 }
150
151 #[test]
152 fn test_header_truncation() {
153 let (static_dir, _) = create_test_static_files_dir();
154
155 let blocks_per_file = 10; let files_per_range = 3; let file_set_count = 3; let initial_file_count = files_per_range * file_set_count;
159 let tip = blocks_per_file * file_set_count - 1; {
163 let sf_rw = StaticFileProviderBuilder::<EthPrimitives>::read_write(&static_dir)
164 .expect("Failed to create static file provider builder")
165 .with_blocks_per_file(blocks_per_file)
166 .build()
167 .expect("Failed to build static file provider");
168
169 let mut header_writer = sf_rw.latest_writer(StaticFileSegment::Headers).unwrap();
170
171 let mut header = Header::default();
173 for num in 0..=tip {
174 header.number = num;
175 header_writer.append_header(&header, &BlockHash::default()).unwrap();
176 }
177 header_writer.commit().unwrap();
178 }
179
180 fn prune_and_validate(
182 writer: &mut StaticFileProviderRWRefMut<'_, EthPrimitives>,
183 sf_rw: &StaticFileProvider<EthPrimitives>,
184 static_dir: impl AsRef<Path>,
185 prune_count: u64,
186 expected_tip: Option<u64>,
187 expected_file_count: u64,
188 ) -> eyre::Result<()> {
189 writer.prune_headers(prune_count)?;
190 writer.commit()?;
191
192 assert_eyre(
194 sf_rw.get_highest_static_file_block(StaticFileSegment::Headers),
195 expected_tip,
196 "block mismatch",
197 )?;
198
199 if let Some(id) = expected_tip {
200 assert_eyre(
201 sf_rw.header_by_number(id)?.map(|h| h.number),
202 expected_tip,
203 "header mismatch",
204 )?;
205 }
206
207 assert_eyre(
209 count_files_without_lockfile(static_dir)?,
210 expected_file_count as usize,
211 "file count mismatch",
212 )?;
213
214 Ok(())
215 }
216
217 type PruneCount = u64;
219 type ExpectedTip = u64;
220 type ExpectedFileCount = u64;
221 let mut tmp_tip = tip;
222 let test_cases: Vec<(PruneCount, Option<ExpectedTip>, ExpectedFileCount)> = vec![
223 {
225 tmp_tip -= 1;
226 (1, Some(tmp_tip), initial_file_count)
227 },
228 {
230 tmp_tip -= blocks_per_file - 1;
231 (blocks_per_file - 1, Some(tmp_tip), initial_file_count - files_per_range)
232 },
233 {
236 tmp_tip -= blocks_per_file + 1;
237 (blocks_per_file + 1, Some(tmp_tip), initial_file_count - files_per_range * 2)
238 },
239 {
241 (
242 tmp_tip,
243 Some(0), files_per_range, )
246 },
247 {
249 (
250 1,
251 None, files_per_range, )
254 },
255 ];
256
257 {
259 let sf_rw = StaticFileProviderBuilder::read_write(&static_dir)
260 .expect("Failed to create static file provider builder")
261 .with_blocks_per_file(blocks_per_file)
262 .build()
263 .expect("Failed to build static file provider");
264
265 assert_eq!(sf_rw.get_highest_static_file_block(StaticFileSegment::Headers), Some(tip));
266 assert_eq!(
267 count_files_without_lockfile(static_dir.as_ref()).unwrap(),
268 initial_file_count as usize
269 );
270
271 let mut header_writer = sf_rw.latest_writer(StaticFileSegment::Headers).unwrap();
272
273 for (case, (prune_count, expected_tip, expected_file_count)) in
274 test_cases.into_iter().enumerate()
275 {
276 prune_and_validate(
277 &mut header_writer,
278 &sf_rw,
279 &static_dir,
280 prune_count,
281 expected_tip,
282 expected_file_count,
283 )
284 .map_err(|err| eyre::eyre!("Test case {case}: {err}"))
285 .unwrap();
286 }
287 }
288 }
289
290 fn setup_tx_based_scenario(
297 sf_rw: &StaticFileProvider<EthPrimitives>,
298 segment: StaticFileSegment,
299 blocks_per_file: u64,
300 ) {
301 fn setup_block_ranges(
302 writer: &mut StaticFileProviderRWRefMut<'_, EthPrimitives>,
303 sf_rw: &StaticFileProvider<EthPrimitives>,
304 segment: StaticFileSegment,
305 block_range: &Range<u64>,
306 mut tx_count: u64,
307 next_tx_num: &mut u64,
308 ) {
309 let mut receipt = Receipt::default();
310 let mut tx = TxLegacy::default();
311
312 for block in block_range.clone() {
313 writer.increment_block(block).unwrap();
314
315 if tx_count > 0 {
317 match segment {
318 StaticFileSegment::Headers => panic!("non tx based segment"),
319 StaticFileSegment::Transactions => {
320 tx.nonce = *next_tx_num;
322 let tx: TransactionSigned =
323 tx.clone().into_signed(Signature::test_signature()).into();
324 writer.append_transaction(*next_tx_num, &tx).unwrap();
325 }
326 StaticFileSegment::Receipts => {
327 receipt.cumulative_gas_used = *next_tx_num;
329 writer.append_receipt(*next_tx_num, &receipt).unwrap();
330 }
331 StaticFileSegment::TransactionSenders => {
332 let sender = Address::from(U160::from(*next_tx_num));
334 writer.append_transaction_sender(*next_tx_num, &sender).unwrap();
335 }
336 }
337 *next_tx_num += 1;
338 tx_count -= 1;
339 }
340 }
341 writer.commit().unwrap();
342
343 let expected_block = block_range.end - 1;
345 let expected_tx = if tx_count == 0 { *next_tx_num - 1 } else { *next_tx_num };
346
347 assert_eq!(sf_rw.get_highest_static_file_block(segment), Some(expected_block),);
349 assert_eq!(sf_rw.get_highest_static_file_tx(segment), Some(expected_tx),);
350 }
351
352 let block_ranges = [
354 0..blocks_per_file,
355 blocks_per_file..blocks_per_file * 2,
356 blocks_per_file * 2..blocks_per_file * 3,
357 ];
358
359 let tx_counts = [
360 blocks_per_file - 1, 0, 1, ];
364
365 let mut writer = sf_rw.latest_writer(segment).unwrap();
366 let mut next_tx_num = 0;
367
368 for (block_range, tx_count) in block_ranges.iter().zip(tx_counts.iter()) {
370 setup_block_ranges(
371 &mut writer,
372 sf_rw,
373 segment,
374 block_range,
375 *tx_count,
376 &mut next_tx_num,
377 );
378 }
379
380 let expected_tx_ranges = vec![
382 Some(SegmentRangeInclusive::new(0, 8)),
383 None,
384 Some(SegmentRangeInclusive::new(9, 9)),
385 ];
386
387 block_ranges.iter().zip(expected_tx_ranges).for_each(|(block_range, expected_tx_range)| {
388 assert_eq!(
389 sf_rw
390 .get_segment_provider_for_block(segment, block_range.start, None)
391 .unwrap()
392 .user_header()
393 .tx_range(),
394 expected_tx_range
395 );
396 });
397
398 let expected_tx_index = BTreeMap::from([
400 (8, SegmentRangeInclusive::new(0, 9)),
401 (9, SegmentRangeInclusive::new(20, 29)),
402 ]);
403 assert_eq!(
404 sf_rw.tx_index(segment),
405 (!expected_tx_index.is_empty()).then_some(expected_tx_index),
406 "tx index mismatch",
407 );
408 }
409
410 #[test]
411 fn test_tx_based_truncation() {
412 let segments = [StaticFileSegment::Transactions, StaticFileSegment::Receipts];
413 let blocks_per_file = 10; let files_per_range = 3; let file_set_count = 3; let initial_file_count = files_per_range * file_set_count;
417
418 #[expect(clippy::too_many_arguments)]
419 fn prune_and_validate(
420 sf_rw: &StaticFileProvider<EthPrimitives>,
421 static_dir: impl AsRef<Path>,
422 segment: StaticFileSegment,
423 prune_count: u64,
424 last_block: u64,
425 expected_tx_tip: Option<u64>,
426 expected_file_count: i32,
427 expected_tx_index: BTreeMap<TxNumber, SegmentRangeInclusive>,
428 ) -> eyre::Result<()> {
429 let mut writer = sf_rw.latest_writer(segment)?;
430
431 match segment {
433 StaticFileSegment::Headers => panic!("non tx based segment"),
434 StaticFileSegment::Transactions => {
435 writer.prune_transactions(prune_count, last_block)?
436 }
437 StaticFileSegment::Receipts => writer.prune_receipts(prune_count, last_block)?,
438 StaticFileSegment::TransactionSenders => {
439 writer.prune_transaction_senders(prune_count, last_block)?
440 }
441 }
442 writer.commit()?;
443
444 assert_eyre(
446 sf_rw.get_highest_static_file_block(segment),
447 Some(last_block),
448 "block mismatch",
449 )?;
450 assert_eyre(sf_rw.get_highest_static_file_tx(segment), expected_tx_tip, "tx mismatch")?;
451
452 if let Some(id) = expected_tx_tip {
455 match segment {
456 StaticFileSegment::Headers => panic!("non tx based segment"),
457 StaticFileSegment::Transactions => assert_eyre(
458 expected_tx_tip,
459 sf_rw.transaction_by_id(id)?.map(|t| t.nonce()),
460 "tx mismatch",
461 )?,
462 StaticFileSegment::Receipts => assert_eyre(
463 expected_tx_tip,
464 sf_rw.receipt(id)?.map(|r| r.cumulative_gas_used),
465 "receipt mismatch",
466 )?,
467 StaticFileSegment::TransactionSenders => assert_eyre(
468 expected_tx_tip,
469 sf_rw
470 .transaction_sender(id)?
471 .map(|s| u64::try_from(U160::from_be_bytes(s.0.into())).unwrap()),
472 "sender mismatch",
473 )?,
474 }
475 }
476
477 assert_eyre(
479 count_files_without_lockfile(static_dir)?,
480 expected_file_count as usize,
481 "file count mismatch",
482 )?;
483
484 assert_eyre(
486 sf_rw.tx_index(segment).map(|index| index.iter().map(|(k, v)| (*k, *v)).collect()),
487 (!expected_tx_index.is_empty()).then_some(expected_tx_index),
488 "tx index mismatch",
489 )?;
490
491 Ok(())
492 }
493
494 for segment in segments {
495 let (static_dir, _) = create_test_static_files_dir();
496
497 let sf_rw = StaticFileProviderBuilder::read_write(&static_dir)
498 .expect("Failed to create static file provider builder")
499 .with_blocks_per_file(blocks_per_file)
500 .build()
501 .expect("Failed to build static file provider");
502
503 setup_tx_based_scenario(&sf_rw, segment, blocks_per_file);
504
505 let sf_rw = StaticFileProviderBuilder::read_write(&static_dir)
506 .expect("Failed to create static file provider builder")
507 .with_blocks_per_file(blocks_per_file)
508 .build()
509 .expect("Failed to build static file provider");
510 let highest_tx = sf_rw.get_highest_static_file_tx(segment).unwrap();
511
512 let test_cases = vec![
515 (
520 1,
521 blocks_per_file * 2,
522 Some(highest_tx - 1),
523 initial_file_count,
524 BTreeMap::from([(highest_tx - 1, SegmentRangeInclusive::new(0, 9))]),
525 ),
526 (
530 0,
531 blocks_per_file - 1,
532 Some(highest_tx - 1),
533 files_per_range,
534 BTreeMap::from([(highest_tx - 1, SegmentRangeInclusive::new(0, 9))]),
535 ),
536 (
538 highest_tx - 1,
539 1,
540 Some(0),
541 files_per_range,
542 BTreeMap::from([(0, SegmentRangeInclusive::new(0, 1))]),
543 ),
544 (1, 0, None, files_per_range, BTreeMap::from([])),
546 ];
547
548 for (
550 case,
551 (prune_count, last_block, expected_tx_tip, expected_file_count, expected_tx_index),
552 ) in test_cases.into_iter().enumerate()
553 {
554 prune_and_validate(
555 &sf_rw,
556 &static_dir,
557 segment,
558 prune_count,
559 last_block,
560 expected_tx_tip,
561 expected_file_count,
562 expected_tx_index,
563 )
564 .map_err(|err| eyre::eyre!("Test case {case}: {err}"))
565 .unwrap();
566 }
567 }
568 }
569
570 fn count_files_without_lockfile(path: impl AsRef<Path>) -> eyre::Result<usize> {
572 let is_lockfile = |entry: &fs::DirEntry| {
573 entry.path().file_name().map(|name| name == "lock").unwrap_or(false)
574 };
575 let count = fs::read_dir(path)?
576 .filter_map(|entry| entry.ok())
577 .filter(|entry| !is_lockfile(entry))
578 .count();
579
580 Ok(count)
581 }
582
583 #[test]
584 fn test_dynamic_size() -> eyre::Result<()> {
585 let (static_dir, _) = create_test_static_files_dir();
586
587 {
588 let sf_rw = StaticFileProviderBuilder::<EthPrimitives>::read_write(&static_dir)?
589 .with_blocks_per_file(10)
590 .build()?;
591 let mut header_writer = sf_rw.latest_writer(StaticFileSegment::Headers)?;
592
593 let mut header = Header::default();
594 for num in 0..=15 {
595 header.number = num;
596 header_writer.append_header(&header, &BlockHash::default()).unwrap();
597 }
598 header_writer.commit().unwrap();
599
600 assert_eq!(sf_rw.headers_range(0..=15)?.len(), 16);
601 assert_eq!(
602 sf_rw.expected_block_index(StaticFileSegment::Headers),
603 Some(BTreeMap::from([
604 (9, SegmentRangeInclusive::new(0, 9)),
605 (19, SegmentRangeInclusive::new(10, 19))
606 ])),
607 )
608 }
609
610 {
611 let sf_rw = StaticFileProviderBuilder::<EthPrimitives>::read_write(&static_dir)?
612 .with_blocks_per_file(5)
613 .build()?;
614 let mut header_writer = sf_rw.latest_writer(StaticFileSegment::Headers)?;
615
616 let mut header = Header::default();
617 for num in 16..=22 {
618 header.number = num;
619 header_writer.append_header(&header, &BlockHash::default()).unwrap();
620 }
621 header_writer.commit().unwrap();
622
623 assert_eq!(sf_rw.headers_range(0..=22)?.len(), 23);
624 assert_eq!(
625 sf_rw.expected_block_index(StaticFileSegment::Headers),
626 Some(BTreeMap::from([
627 (9, SegmentRangeInclusive::new(0, 9)),
628 (19, SegmentRangeInclusive::new(10, 19)),
629 (24, SegmentRangeInclusive::new(20, 24))
630 ]))
631 )
632 }
633
634 {
635 let sf_rw = StaticFileProviderBuilder::<EthPrimitives>::read_write(&static_dir)?
636 .with_blocks_per_file(15)
637 .build()?;
638 let mut header_writer = sf_rw.latest_writer(StaticFileSegment::Headers)?;
639
640 let mut header = Header::default();
641 for num in 23..=40 {
642 header.number = num;
643 header_writer.append_header(&header, &BlockHash::default()).unwrap();
644 }
645 header_writer.commit().unwrap();
646
647 assert_eq!(sf_rw.headers_range(0..=40)?.len(), 41);
648 assert_eq!(
649 sf_rw.expected_block_index(StaticFileSegment::Headers),
650 Some(BTreeMap::from([
651 (9, SegmentRangeInclusive::new(0, 9)),
652 (19, SegmentRangeInclusive::new(10, 19)),
653 (24, SegmentRangeInclusive::new(20, 24)),
654 (39, SegmentRangeInclusive::new(25, 39)),
655 (54, SegmentRangeInclusive::new(40, 54))
656 ]))
657 )
658 }
659
660 Ok(())
661 }
662}