1mod manager;
2pub use manager::{StaticFileAccess, StaticFileProvider, StaticFileWriter};
3
4mod jar;
5pub use jar::StaticFileJarProvider;
6
7mod writer;
8pub use writer::{StaticFileProviderRW, StaticFileProviderRWRefMut};
9
10mod metrics;
11use reth_nippy_jar::NippyJar;
12use reth_static_file_types::{SegmentHeader, StaticFileSegment};
13use reth_storage_errors::provider::{ProviderError, ProviderResult};
14use std::{ops::Deref, sync::Arc};
15
16type LoadedJarRef<'a> = dashmap::mapref::one::Ref<'a, (u64, StaticFileSegment), LoadedJar>;
18
19#[derive(Debug)]
21pub struct LoadedJar {
22 jar: NippyJar<SegmentHeader>,
23 mmap_handle: Arc<reth_nippy_jar::DataReader>,
24}
25
26impl LoadedJar {
27 fn new(jar: NippyJar<SegmentHeader>) -> ProviderResult<Self> {
28 match jar.open_data_reader() {
29 Ok(data_reader) => {
30 let mmap_handle = Arc::new(data_reader);
31 Ok(Self { jar, mmap_handle })
32 }
33 Err(e) => Err(ProviderError::other(e)),
34 }
35 }
36
37 fn mmap_handle(&self) -> Arc<reth_nippy_jar::DataReader> {
39 self.mmap_handle.clone()
40 }
41
42 const fn segment(&self) -> StaticFileSegment {
43 self.jar.user_header().segment()
44 }
45}
46
47impl Deref for LoadedJar {
48 type Target = NippyJar<SegmentHeader>;
49 fn deref(&self) -> &Self::Target {
50 &self.jar
51 }
52}
53
54#[cfg(test)]
55mod tests {
56 use super::*;
57 use crate::{
58 test_utils::create_test_provider_factory, HeaderProvider, StaticFileProviderFactory,
59 };
60 use alloy_consensus::{Header, SignableTransaction, Transaction, TxLegacy};
61 use alloy_primitives::{BlockHash, Signature, TxNumber, B256, U256};
62 use rand::seq::SliceRandom;
63 use reth_db::test_utils::create_test_static_files_dir;
64 use reth_db_api::{
65 transaction::DbTxMut, CanonicalHeaders, HeaderNumbers, HeaderTerminalDifficulties, Headers,
66 };
67 use reth_ethereum_primitives::{EthPrimitives, Receipt, TransactionSigned};
68 use reth_static_file_types::{
69 find_fixed_range, SegmentRangeInclusive, DEFAULT_BLOCKS_PER_STATIC_FILE,
70 };
71 use reth_storage_api::{ReceiptProvider, TransactionsProvider};
72 use reth_testing_utils::generators::{self, random_header_range};
73 use std::{fmt::Debug, fs, ops::Range, path::Path};
74
75 fn assert_eyre<T: PartialEq + Debug>(got: T, expected: T, msg: &str) -> eyre::Result<()> {
76 if got != expected {
77 eyre::bail!("{msg} | got: {got:?} expected: {expected:?})");
78 }
79 Ok(())
80 }
81
82 #[test]
83 fn test_snap() {
84 let row_count = 100u64;
86 let range = 0..=(row_count - 1);
87
88 let factory = create_test_provider_factory();
90 let static_files_path = tempfile::tempdir().unwrap();
91 let static_file = static_files_path.path().join(
92 StaticFileSegment::Headers
93 .filename(&find_fixed_range(*range.end(), DEFAULT_BLOCKS_PER_STATIC_FILE)),
94 );
95
96 let mut headers = random_header_range(
98 &mut generators::rng(),
99 *range.start()..(*range.end() + 1),
100 B256::random(),
101 );
102
103 let mut provider_rw = factory.provider_rw().unwrap();
104 let tx = provider_rw.tx_mut();
105 let mut td = U256::ZERO;
106 for header in headers.clone() {
107 td += header.header().difficulty;
108 let hash = header.hash();
109
110 tx.put::<CanonicalHeaders>(header.number, hash).unwrap();
111 tx.put::<Headers>(header.number, header.clone_header()).unwrap();
112 tx.put::<HeaderTerminalDifficulties>(header.number, td.into()).unwrap();
113 tx.put::<HeaderNumbers>(hash, header.number).unwrap();
114 }
115 provider_rw.commit().unwrap();
116
117 {
119 let manager = factory.static_file_provider();
120 let mut writer = manager.latest_writer(StaticFileSegment::Headers).unwrap();
121 let mut td = U256::ZERO;
122
123 for header in headers.clone() {
124 td += header.header().difficulty;
125 let hash = header.hash();
126 writer.append_header(&header.unseal(), td, &hash).unwrap();
127 }
128 writer.commit().unwrap();
129 }
130
131 {
133 let db_provider = factory.provider().unwrap();
134 let manager = db_provider.static_file_provider();
135 let jar_provider = manager
136 .get_segment_provider_from_block(StaticFileSegment::Headers, 0, Some(&static_file))
137 .unwrap();
138
139 assert!(!headers.is_empty());
140
141 headers.shuffle(&mut generators::rng());
143
144 for header in headers {
145 let header_hash = header.hash();
146 let header = header.unseal();
147
148 assert_eq!(header, db_provider.header(&header_hash).unwrap().unwrap());
150 assert_eq!(header, jar_provider.header_by_number(header.number).unwrap().unwrap());
151
152 assert_eq!(
154 db_provider.header_td(&header_hash).unwrap().unwrap(),
155 jar_provider.header_td_by_number(header.number).unwrap().unwrap()
156 );
157 }
158 }
159 }
160
161 #[test]
162 fn test_header_truncation() {
163 let (static_dir, _) = create_test_static_files_dir();
164
165 let blocks_per_file = 10; let files_per_range = 3; let file_set_count = 3; let initial_file_count = files_per_range * file_set_count;
169 let tip = blocks_per_file * file_set_count - 1; {
173 let sf_rw = StaticFileProvider::<EthPrimitives>::read_write(&static_dir)
174 .expect("Failed to create static file provider")
175 .with_custom_blocks_per_file(blocks_per_file);
176
177 let mut header_writer = sf_rw.latest_writer(StaticFileSegment::Headers).unwrap();
178
179 let mut header = Header::default();
181 for num in 0..=tip {
182 header.number = num;
183 header_writer
184 .append_header(&header, U256::default(), &BlockHash::default())
185 .unwrap();
186 }
187 header_writer.commit().unwrap();
188 }
189
190 fn prune_and_validate(
192 writer: &mut StaticFileProviderRWRefMut<'_, EthPrimitives>,
193 sf_rw: &StaticFileProvider<EthPrimitives>,
194 static_dir: impl AsRef<Path>,
195 prune_count: u64,
196 expected_tip: Option<u64>,
197 expected_file_count: u64,
198 ) -> eyre::Result<()> {
199 writer.prune_headers(prune_count)?;
200 writer.commit()?;
201
202 assert_eyre(
204 sf_rw.get_highest_static_file_block(StaticFileSegment::Headers),
205 expected_tip,
206 "block mismatch",
207 )?;
208
209 if let Some(id) = expected_tip {
210 assert_eyre(
211 sf_rw.header_by_number(id)?.map(|h| h.number),
212 expected_tip,
213 "header mismatch",
214 )?;
215 }
216
217 assert_eyre(
219 count_files_without_lockfile(static_dir)?,
220 expected_file_count as usize,
221 "file count mismatch",
222 )?;
223
224 Ok(())
225 }
226
227 type PruneCount = u64;
229 type ExpectedTip = u64;
230 type ExpectedFileCount = u64;
231 let mut tmp_tip = tip;
232 let test_cases: Vec<(PruneCount, Option<ExpectedTip>, ExpectedFileCount)> = vec![
233 {
235 tmp_tip -= 1;
236 (1, Some(tmp_tip), initial_file_count)
237 },
238 {
240 tmp_tip -= blocks_per_file - 1;
241 (blocks_per_file - 1, Some(tmp_tip), initial_file_count - files_per_range)
242 },
243 {
246 tmp_tip -= blocks_per_file + 1;
247 (blocks_per_file + 1, Some(tmp_tip), initial_file_count - files_per_range * 2)
248 },
249 {
251 (
252 tmp_tip,
253 Some(0), files_per_range, )
256 },
257 {
259 (
260 1,
261 None, files_per_range, )
264 },
265 ];
266
267 {
269 let sf_rw = StaticFileProvider::read_write(&static_dir)
270 .expect("Failed to create static file provider")
271 .with_custom_blocks_per_file(blocks_per_file);
272
273 assert_eq!(sf_rw.get_highest_static_file_block(StaticFileSegment::Headers), Some(tip));
274 assert_eq!(
275 count_files_without_lockfile(static_dir.as_ref()).unwrap(),
276 initial_file_count as usize
277 );
278
279 let mut header_writer = sf_rw.latest_writer(StaticFileSegment::Headers).unwrap();
280
281 for (case, (prune_count, expected_tip, expected_file_count)) in
282 test_cases.into_iter().enumerate()
283 {
284 prune_and_validate(
285 &mut header_writer,
286 &sf_rw,
287 &static_dir,
288 prune_count,
289 expected_tip,
290 expected_file_count,
291 )
292 .map_err(|err| eyre::eyre!("Test case {case}: {err}"))
293 .unwrap();
294 }
295 }
296 }
297
298 fn setup_tx_based_scenario(
305 sf_rw: &StaticFileProvider<EthPrimitives>,
306 segment: StaticFileSegment,
307 blocks_per_file: u64,
308 ) {
309 fn setup_block_ranges(
310 writer: &mut StaticFileProviderRWRefMut<'_, EthPrimitives>,
311 sf_rw: &StaticFileProvider<EthPrimitives>,
312 segment: StaticFileSegment,
313 block_range: &Range<u64>,
314 mut tx_count: u64,
315 next_tx_num: &mut u64,
316 ) {
317 let mut receipt = Receipt::default();
318 let mut tx = TxLegacy::default();
319
320 for block in block_range.clone() {
321 writer.increment_block(block).unwrap();
322
323 if tx_count > 0 {
325 if segment.is_receipts() {
326 receipt.cumulative_gas_used = *next_tx_num;
328 writer.append_receipt(*next_tx_num, &receipt).unwrap();
329 } else {
330 tx.nonce = *next_tx_num;
332 let tx: TransactionSigned =
333 tx.clone().into_signed(Signature::test_signature()).into();
334 writer.append_transaction(*next_tx_num, &tx).unwrap();
335 }
336 *next_tx_num += 1;
337 tx_count -= 1;
338 }
339 }
340 writer.commit().unwrap();
341
342 let expected_block = block_range.end - 1;
344 let expected_tx = if tx_count == 0 { *next_tx_num - 1 } else { *next_tx_num };
345
346 assert_eq!(sf_rw.get_highest_static_file_block(segment), Some(expected_block),);
348 assert_eq!(sf_rw.get_highest_static_file_tx(segment), Some(expected_tx),);
349 }
350
351 let block_ranges = [
353 0..blocks_per_file,
354 blocks_per_file..blocks_per_file * 2,
355 blocks_per_file * 2..blocks_per_file * 3,
356 ];
357
358 let tx_counts = [
359 blocks_per_file - 1, 0, 1, ];
363
364 let mut writer = sf_rw.latest_writer(segment).unwrap();
365 let mut next_tx_num = 0;
366
367 for (block_range, tx_count) in block_ranges.iter().zip(tx_counts.iter()) {
369 setup_block_ranges(
370 &mut writer,
371 sf_rw,
372 segment,
373 block_range,
374 *tx_count,
375 &mut next_tx_num,
376 );
377 }
378
379 let expected_tx_ranges = vec![
381 Some(SegmentRangeInclusive::new(0, 8)),
382 None,
383 Some(SegmentRangeInclusive::new(9, 9)),
384 ];
385
386 block_ranges.iter().zip(expected_tx_ranges).for_each(|(block_range, expected_tx_range)| {
387 assert_eq!(
388 sf_rw
389 .get_segment_provider_from_block(segment, block_range.start, None)
390 .unwrap()
391 .user_header()
392 .tx_range(),
393 expected_tx_range.as_ref()
394 );
395 });
396
397 let tx_index = sf_rw.tx_index().read();
399 let expected_tx_index =
400 vec![(8, SegmentRangeInclusive::new(0, 9)), (9, SegmentRangeInclusive::new(20, 29))];
401 assert_eq!(
402 tx_index.get(&segment).map(|index| index.iter().map(|(k, v)| (*k, *v)).collect()),
403 (!expected_tx_index.is_empty()).then_some(expected_tx_index),
404 "tx index mismatch",
405 );
406 }
407
408 #[test]
409 fn test_tx_based_truncation() {
410 let segments = [StaticFileSegment::Transactions, StaticFileSegment::Receipts];
411 let blocks_per_file = 10; let files_per_range = 3; let file_set_count = 3; let initial_file_count = files_per_range * file_set_count;
415
416 #[expect(clippy::too_many_arguments)]
417 fn prune_and_validate(
418 sf_rw: &StaticFileProvider<EthPrimitives>,
419 static_dir: impl AsRef<Path>,
420 segment: StaticFileSegment,
421 prune_count: u64,
422 last_block: u64,
423 expected_tx_tip: Option<u64>,
424 expected_file_count: i32,
425 expected_tx_index: Vec<(TxNumber, SegmentRangeInclusive)>,
426 ) -> eyre::Result<()> {
427 let mut writer = sf_rw.latest_writer(segment)?;
428
429 if segment.is_receipts() {
431 writer.prune_receipts(prune_count, last_block)?;
432 } else {
433 writer.prune_transactions(prune_count, last_block)?;
434 }
435 writer.commit()?;
436
437 assert_eyre(
439 sf_rw.get_highest_static_file_block(segment),
440 Some(last_block),
441 "block mismatch",
442 )?;
443 assert_eyre(sf_rw.get_highest_static_file_tx(segment), expected_tx_tip, "tx mismatch")?;
444
445 if let Some(id) = expected_tx_tip {
448 if segment.is_receipts() {
449 assert_eyre(
450 expected_tx_tip,
451 sf_rw.receipt(id)?.map(|r| r.cumulative_gas_used),
452 "tx mismatch",
453 )?;
454 } else {
455 assert_eyre(
456 expected_tx_tip,
457 sf_rw.transaction_by_id(id)?.map(|t| t.nonce()),
458 "tx mismatch",
459 )?;
460 }
461 }
462
463 assert_eyre(
465 count_files_without_lockfile(static_dir)?,
466 expected_file_count as usize,
467 "file count mismatch",
468 )?;
469
470 let tx_index = sf_rw.tx_index().read();
472 assert_eyre(
473 tx_index.get(&segment).map(|index| index.iter().map(|(k, v)| (*k, *v)).collect()),
474 (!expected_tx_index.is_empty()).then_some(expected_tx_index),
475 "tx index mismatch",
476 )?;
477
478 Ok(())
479 }
480
481 for segment in segments {
482 let (static_dir, _) = create_test_static_files_dir();
483
484 let sf_rw = StaticFileProvider::read_write(&static_dir)
485 .expect("Failed to create static file provider")
486 .with_custom_blocks_per_file(blocks_per_file);
487
488 setup_tx_based_scenario(&sf_rw, segment, blocks_per_file);
489
490 let sf_rw = StaticFileProvider::read_write(&static_dir)
491 .expect("Failed to create static file provider")
492 .with_custom_blocks_per_file(blocks_per_file);
493 let highest_tx = sf_rw.get_highest_static_file_tx(segment).unwrap();
494
495 let test_cases = vec![
498 (
503 1,
504 blocks_per_file * 2,
505 Some(highest_tx - 1),
506 initial_file_count,
507 vec![(highest_tx - 1, SegmentRangeInclusive::new(0, 9))],
508 ),
509 (
513 0,
514 blocks_per_file - 1,
515 Some(highest_tx - 1),
516 files_per_range,
517 vec![(highest_tx - 1, SegmentRangeInclusive::new(0, 9))],
518 ),
519 (
521 highest_tx - 1,
522 1,
523 Some(0),
524 files_per_range,
525 vec![(0, SegmentRangeInclusive::new(0, 1))],
526 ),
527 (1, 0, None, files_per_range, vec![]),
529 ];
530
531 for (
533 case,
534 (prune_count, last_block, expected_tx_tip, expected_file_count, expected_tx_index),
535 ) in test_cases.into_iter().enumerate()
536 {
537 prune_and_validate(
538 &sf_rw,
539 &static_dir,
540 segment,
541 prune_count,
542 last_block,
543 expected_tx_tip,
544 expected_file_count,
545 expected_tx_index,
546 )
547 .map_err(|err| eyre::eyre!("Test case {case}: {err}"))
548 .unwrap();
549 }
550 }
551 }
552
553 fn count_files_without_lockfile(path: impl AsRef<Path>) -> eyre::Result<usize> {
555 let is_lockfile = |entry: &fs::DirEntry| {
556 entry.path().file_name().map(|name| name == "lock").unwrap_or(false)
557 };
558 let count = fs::read_dir(path)?
559 .filter_map(|entry| entry.ok())
560 .filter(|entry| !is_lockfile(entry))
561 .count();
562
563 Ok(count)
564 }
565}