reth_provider/providers/static_file/
mod.rs

1mod manager;
2pub use manager::{
3    StaticFileAccess, StaticFileProvider, StaticFileProviderBuilder, StaticFileWriter,
4};
5
6mod jar;
7pub use jar::StaticFileJarProvider;
8
9mod writer;
10pub use writer::{StaticFileProviderRW, StaticFileProviderRWRefMut};
11
12mod metrics;
13use reth_nippy_jar::NippyJar;
14use reth_static_file_types::{SegmentHeader, StaticFileSegment};
15use reth_storage_errors::provider::{ProviderError, ProviderResult};
16use std::{ops::Deref, sync::Arc};
17
18/// Alias type for each specific `NippyJar`.
19type LoadedJarRef<'a> = dashmap::mapref::one::Ref<'a, (u64, StaticFileSegment), LoadedJar>;
20
21/// Helper type to reuse an associated static file mmap handle on created cursors.
22#[derive(Debug)]
23pub struct LoadedJar {
24    jar: NippyJar<SegmentHeader>,
25    mmap_handle: Arc<reth_nippy_jar::DataReader>,
26}
27
28impl LoadedJar {
29    fn new(jar: NippyJar<SegmentHeader>) -> ProviderResult<Self> {
30        match jar.open_data_reader() {
31            Ok(data_reader) => {
32                let mmap_handle = Arc::new(data_reader);
33                Ok(Self { jar, mmap_handle })
34            }
35            Err(e) => Err(ProviderError::other(e)),
36        }
37    }
38
39    /// Returns a clone of the mmap handle that can be used to instantiate a cursor.
40    fn mmap_handle(&self) -> Arc<reth_nippy_jar::DataReader> {
41        self.mmap_handle.clone()
42    }
43
44    const fn segment(&self) -> StaticFileSegment {
45        self.jar.user_header().segment()
46    }
47}
48
49impl Deref for LoadedJar {
50    type Target = NippyJar<SegmentHeader>;
51    fn deref(&self) -> &Self::Target {
52        &self.jar
53    }
54}
55
56#[cfg(test)]
57mod tests {
58    use super::*;
59    use crate::{
60        providers::static_file::manager::StaticFileProviderBuilder,
61        test_utils::create_test_provider_factory, HeaderProvider, StaticFileProviderFactory,
62    };
63    use alloy_consensus::{Header, SignableTransaction, Transaction, TxLegacy};
64    use alloy_primitives::{Address, BlockHash, Signature, TxNumber, B256, U160};
65    use rand::seq::SliceRandom;
66    use reth_db::test_utils::create_test_static_files_dir;
67    use reth_db_api::{transaction::DbTxMut, CanonicalHeaders, HeaderNumbers, Headers};
68    use reth_ethereum_primitives::{EthPrimitives, Receipt, TransactionSigned};
69    use reth_static_file_types::{
70        find_fixed_range, SegmentRangeInclusive, DEFAULT_BLOCKS_PER_STATIC_FILE,
71    };
72    use reth_storage_api::{ReceiptProvider, TransactionsProvider};
73    use reth_testing_utils::generators::{self, random_header_range};
74    use std::{collections::BTreeMap, fmt::Debug, fs, ops::Range, path::Path};
75
76    fn assert_eyre<T: PartialEq + Debug>(got: T, expected: T, msg: &str) -> eyre::Result<()> {
77        if got != expected {
78            eyre::bail!("{msg} | got: {got:?} expected: {expected:?}");
79        }
80        Ok(())
81    }
82
83    #[test]
84    fn test_static_files() {
85        // Ranges
86        let row_count = 100u64;
87        let range = 0..=(row_count - 1);
88
89        // Data sources
90        let factory = create_test_provider_factory();
91        let static_files_path = tempfile::tempdir().unwrap();
92        let static_file = static_files_path.path().join(
93            StaticFileSegment::Headers
94                .filename(&find_fixed_range(*range.end(), DEFAULT_BLOCKS_PER_STATIC_FILE)),
95        );
96
97        // Setup data
98        let mut headers = random_header_range(
99            &mut generators::rng(),
100            *range.start()..(*range.end() + 1),
101            B256::random(),
102        );
103
104        let mut provider_rw = factory.provider_rw().unwrap();
105        let tx = provider_rw.tx_mut();
106        for header in headers.clone() {
107            let hash = header.hash();
108
109            tx.put::<CanonicalHeaders>(header.number, hash).unwrap();
110            tx.put::<Headers>(header.number, header.clone_header()).unwrap();
111            tx.put::<HeaderNumbers>(hash, header.number).unwrap();
112        }
113        provider_rw.commit().unwrap();
114
115        // Create StaticFile
116        {
117            let manager = factory.static_file_provider();
118            let mut writer = manager.latest_writer(StaticFileSegment::Headers).unwrap();
119
120            for header in headers.clone() {
121                let hash = header.hash();
122                writer.append_header(&header.unseal(), &hash).unwrap();
123            }
124            writer.commit().unwrap();
125        }
126
127        // Use providers to query Header data and compare if it matches
128        {
129            let db_provider = factory.provider().unwrap();
130            let manager = db_provider.static_file_provider();
131            let jar_provider = manager
132                .get_segment_provider_for_block(StaticFileSegment::Headers, 0, Some(&static_file))
133                .unwrap();
134
135            assert!(!headers.is_empty());
136
137            // Shuffled for chaos.
138            headers.shuffle(&mut generators::rng());
139
140            for header in headers {
141                let header_hash = header.hash();
142                let header = header.unseal();
143
144                // Compare Header
145                assert_eq!(header, db_provider.header(header_hash).unwrap().unwrap());
146                assert_eq!(header, jar_provider.header_by_number(header.number).unwrap().unwrap());
147            }
148        }
149    }
150
151    #[test]
152    fn test_header_truncation() {
153        let (static_dir, _) = create_test_static_files_dir();
154
155        let blocks_per_file = 10; // Number of headers per file
156        let files_per_range = 3; // Number of files per range (data/conf/offset files)
157        let file_set_count = 3; // Number of sets of files to create
158        let initial_file_count = files_per_range * file_set_count;
159        let tip = blocks_per_file * file_set_count - 1; // Initial highest block (29 in this case)
160
161        // [ Headers Creation and Commit ]
162        {
163            let sf_rw = StaticFileProviderBuilder::<EthPrimitives>::read_write(&static_dir)
164                .expect("Failed to create static file provider builder")
165                .with_blocks_per_file(blocks_per_file)
166                .build()
167                .expect("Failed to build static file provider");
168
169            let mut header_writer = sf_rw.latest_writer(StaticFileSegment::Headers).unwrap();
170
171            // Append headers from 0 to the tip (29) and commit
172            let mut header = Header::default();
173            for num in 0..=tip {
174                header.number = num;
175                header_writer.append_header(&header, &BlockHash::default()).unwrap();
176            }
177            header_writer.commit().unwrap();
178        }
179
180        // Helper function to prune headers and validate truncation results
181        fn prune_and_validate(
182            writer: &mut StaticFileProviderRWRefMut<'_, EthPrimitives>,
183            sf_rw: &StaticFileProvider<EthPrimitives>,
184            static_dir: impl AsRef<Path>,
185            prune_count: u64,
186            expected_tip: Option<u64>,
187            expected_file_count: u64,
188        ) -> eyre::Result<()> {
189            writer.prune_headers(prune_count)?;
190            writer.commit()?;
191
192            // Validate the highest block after pruning
193            assert_eyre(
194                sf_rw.get_highest_static_file_block(StaticFileSegment::Headers),
195                expected_tip,
196                "block mismatch",
197            )?;
198
199            if let Some(id) = expected_tip {
200                assert_eyre(
201                    sf_rw.header_by_number(id)?.map(|h| h.number),
202                    expected_tip,
203                    "header mismatch",
204                )?;
205            }
206
207            // Validate the number of files remaining in the directory
208            assert_eyre(
209                count_files_without_lockfile(static_dir)?,
210                expected_file_count as usize,
211                "file count mismatch",
212            )?;
213
214            Ok(())
215        }
216
217        // [ Test Cases ]
218        type PruneCount = u64;
219        type ExpectedTip = u64;
220        type ExpectedFileCount = u64;
221        let mut tmp_tip = tip;
222        let test_cases: Vec<(PruneCount, Option<ExpectedTip>, ExpectedFileCount)> = vec![
223            // Case 0: Pruning 1 header
224            {
225                tmp_tip -= 1;
226                (1, Some(tmp_tip), initial_file_count)
227            },
228            // Case 1: Pruning remaining rows from file should result in its deletion
229            {
230                tmp_tip -= blocks_per_file - 1;
231                (blocks_per_file - 1, Some(tmp_tip), initial_file_count - files_per_range)
232            },
233            // Case 2: Pruning more headers than a single file has (tip reduced by
234            // blocks_per_file + 1) should result in a file set deletion
235            {
236                tmp_tip -= blocks_per_file + 1;
237                (blocks_per_file + 1, Some(tmp_tip), initial_file_count - files_per_range * 2)
238            },
239            // Case 3: Pruning all remaining headers from the file except the genesis header
240            {
241                (
242                    tmp_tip,
243                    Some(0),         // Only genesis block remains
244                    files_per_range, // The file set with block 0 should remain
245                )
246            },
247            // Case 4: Pruning the genesis header (should not delete the file set with block 0)
248            {
249                (
250                    1,
251                    None,            // No blocks left
252                    files_per_range, // The file set with block 0 remains
253                )
254            },
255        ];
256
257        // Test cases execution
258        {
259            let sf_rw = StaticFileProviderBuilder::read_write(&static_dir)
260                .expect("Failed to create static file provider builder")
261                .with_blocks_per_file(blocks_per_file)
262                .build()
263                .expect("Failed to build static file provider");
264
265            assert_eq!(sf_rw.get_highest_static_file_block(StaticFileSegment::Headers), Some(tip));
266            assert_eq!(
267                count_files_without_lockfile(static_dir.as_ref()).unwrap(),
268                initial_file_count as usize
269            );
270
271            let mut header_writer = sf_rw.latest_writer(StaticFileSegment::Headers).unwrap();
272
273            for (case, (prune_count, expected_tip, expected_file_count)) in
274                test_cases.into_iter().enumerate()
275            {
276                prune_and_validate(
277                    &mut header_writer,
278                    &sf_rw,
279                    &static_dir,
280                    prune_count,
281                    expected_tip,
282                    expected_file_count,
283                )
284                .map_err(|err| eyre::eyre!("Test case {case}: {err}"))
285                .unwrap();
286            }
287        }
288    }
289
290    /// 3 block ranges are built
291    ///
292    /// for `blocks_per_file = 10`:
293    /// * `0..=9` : except genesis, every block has a tx/receipt
294    /// * `10..=19`: no txs/receipts
295    /// * `20..=29`: only one tx/receipt
296    fn setup_tx_based_scenario(
297        sf_rw: &StaticFileProvider<EthPrimitives>,
298        segment: StaticFileSegment,
299        blocks_per_file: u64,
300    ) {
301        fn setup_block_ranges(
302            writer: &mut StaticFileProviderRWRefMut<'_, EthPrimitives>,
303            sf_rw: &StaticFileProvider<EthPrimitives>,
304            segment: StaticFileSegment,
305            block_range: &Range<u64>,
306            mut tx_count: u64,
307            next_tx_num: &mut u64,
308        ) {
309            let mut receipt = Receipt::default();
310            let mut tx = TxLegacy::default();
311
312            for block in block_range.clone() {
313                writer.increment_block(block).unwrap();
314
315                // Append transaction/receipt if there's still a transaction count to append
316                if tx_count > 0 {
317                    match segment {
318                        StaticFileSegment::Headers => panic!("non tx based segment"),
319                        StaticFileSegment::Transactions => {
320                            // Used as ID for validation
321                            tx.nonce = *next_tx_num;
322                            let tx: TransactionSigned =
323                                tx.clone().into_signed(Signature::test_signature()).into();
324                            writer.append_transaction(*next_tx_num, &tx).unwrap();
325                        }
326                        StaticFileSegment::Receipts => {
327                            // Used as ID for validation
328                            receipt.cumulative_gas_used = *next_tx_num;
329                            writer.append_receipt(*next_tx_num, &receipt).unwrap();
330                        }
331                        StaticFileSegment::TransactionSenders => {
332                            // Used as ID for validation
333                            let sender = Address::from(U160::from(*next_tx_num));
334                            writer.append_transaction_sender(*next_tx_num, &sender).unwrap();
335                        }
336                    }
337                    *next_tx_num += 1;
338                    tx_count -= 1;
339                }
340            }
341            writer.commit().unwrap();
342
343            // Calculate expected values based on the range and transactions
344            let expected_block = block_range.end - 1;
345            let expected_tx = if tx_count == 0 { *next_tx_num - 1 } else { *next_tx_num };
346
347            // Perform assertions after processing the blocks
348            assert_eq!(sf_rw.get_highest_static_file_block(segment), Some(expected_block),);
349            assert_eq!(sf_rw.get_highest_static_file_tx(segment), Some(expected_tx),);
350        }
351
352        // Define the block ranges and transaction counts as vectors
353        let block_ranges = [
354            0..blocks_per_file,
355            blocks_per_file..blocks_per_file * 2,
356            blocks_per_file * 2..blocks_per_file * 3,
357        ];
358
359        let tx_counts = [
360            blocks_per_file - 1, // First range: tx per block except genesis
361            0,                   // Second range: no transactions
362            1,                   // Third range: 1 transaction in the second block
363        ];
364
365        let mut writer = sf_rw.latest_writer(segment).unwrap();
366        let mut next_tx_num = 0;
367
368        // Loop through setup scenarios
369        for (block_range, tx_count) in block_ranges.iter().zip(tx_counts.iter()) {
370            setup_block_ranges(
371                &mut writer,
372                sf_rw,
373                segment,
374                block_range,
375                *tx_count,
376                &mut next_tx_num,
377            );
378        }
379
380        // Ensure that scenario was properly setup
381        let expected_tx_ranges = vec![
382            Some(SegmentRangeInclusive::new(0, 8)),
383            None,
384            Some(SegmentRangeInclusive::new(9, 9)),
385        ];
386
387        block_ranges.iter().zip(expected_tx_ranges).for_each(|(block_range, expected_tx_range)| {
388            assert_eq!(
389                sf_rw
390                    .get_segment_provider_for_block(segment, block_range.start, None)
391                    .unwrap()
392                    .user_header()
393                    .tx_range(),
394                expected_tx_range
395            );
396        });
397
398        // Ensure transaction index
399        let expected_tx_index = BTreeMap::from([
400            (8, SegmentRangeInclusive::new(0, 9)),
401            (9, SegmentRangeInclusive::new(20, 29)),
402        ]);
403        assert_eq!(
404            sf_rw.tx_index(segment),
405            (!expected_tx_index.is_empty()).then_some(expected_tx_index),
406            "tx index mismatch",
407        );
408    }
409
410    #[test]
411    fn test_tx_based_truncation() {
412        let segments = [StaticFileSegment::Transactions, StaticFileSegment::Receipts];
413        let blocks_per_file = 10; // Number of blocks per file
414        let files_per_range = 3; // Number of files per range (data/conf/offset files)
415        let file_set_count = 3; // Number of sets of files to create
416        let initial_file_count = files_per_range * file_set_count;
417
418        #[expect(clippy::too_many_arguments)]
419        fn prune_and_validate(
420            sf_rw: &StaticFileProvider<EthPrimitives>,
421            static_dir: impl AsRef<Path>,
422            segment: StaticFileSegment,
423            prune_count: u64,
424            last_block: u64,
425            expected_tx_tip: Option<u64>,
426            expected_file_count: i32,
427            expected_tx_index: BTreeMap<TxNumber, SegmentRangeInclusive>,
428        ) -> eyre::Result<()> {
429            let mut writer = sf_rw.latest_writer(segment)?;
430
431            // Prune transactions or receipts based on the segment type
432            match segment {
433                StaticFileSegment::Headers => panic!("non tx based segment"),
434                StaticFileSegment::Transactions => {
435                    writer.prune_transactions(prune_count, last_block)?
436                }
437                StaticFileSegment::Receipts => writer.prune_receipts(prune_count, last_block)?,
438                StaticFileSegment::TransactionSenders => {
439                    writer.prune_transaction_senders(prune_count, last_block)?
440                }
441            }
442            writer.commit()?;
443
444            // Verify the highest block and transaction tips
445            assert_eyre(
446                sf_rw.get_highest_static_file_block(segment),
447                Some(last_block),
448                "block mismatch",
449            )?;
450            assert_eyre(sf_rw.get_highest_static_file_tx(segment), expected_tx_tip, "tx mismatch")?;
451
452            // Verify that transactions and receipts are returned correctly. Uses
453            // cumulative_gas_used & nonce as ids.
454            if let Some(id) = expected_tx_tip {
455                match segment {
456                    StaticFileSegment::Headers => panic!("non tx based segment"),
457                    StaticFileSegment::Transactions => assert_eyre(
458                        expected_tx_tip,
459                        sf_rw.transaction_by_id(id)?.map(|t| t.nonce()),
460                        "tx mismatch",
461                    )?,
462                    StaticFileSegment::Receipts => assert_eyre(
463                        expected_tx_tip,
464                        sf_rw.receipt(id)?.map(|r| r.cumulative_gas_used),
465                        "receipt mismatch",
466                    )?,
467                    StaticFileSegment::TransactionSenders => assert_eyre(
468                        expected_tx_tip,
469                        sf_rw
470                            .transaction_sender(id)?
471                            .map(|s| u64::try_from(U160::from_be_bytes(s.0.into())).unwrap()),
472                        "sender mismatch",
473                    )?,
474                }
475            }
476
477            // Ensure the file count has reduced as expected
478            assert_eyre(
479                count_files_without_lockfile(static_dir)?,
480                expected_file_count as usize,
481                "file count mismatch",
482            )?;
483
484            // Ensure that the inner tx index (max_tx -> block range) is as expected
485            assert_eyre(
486                sf_rw.tx_index(segment).map(|index| index.iter().map(|(k, v)| (*k, *v)).collect()),
487                (!expected_tx_index.is_empty()).then_some(expected_tx_index),
488                "tx index mismatch",
489            )?;
490
491            Ok(())
492        }
493
494        for segment in segments {
495            let (static_dir, _) = create_test_static_files_dir();
496
497            let sf_rw = StaticFileProviderBuilder::read_write(&static_dir)
498                .expect("Failed to create static file provider builder")
499                .with_blocks_per_file(blocks_per_file)
500                .build()
501                .expect("Failed to build static file provider");
502
503            setup_tx_based_scenario(&sf_rw, segment, blocks_per_file);
504
505            let sf_rw = StaticFileProviderBuilder::read_write(&static_dir)
506                .expect("Failed to create static file provider builder")
507                .with_blocks_per_file(blocks_per_file)
508                .build()
509                .expect("Failed to build static file provider");
510            let highest_tx = sf_rw.get_highest_static_file_tx(segment).unwrap();
511
512            // Test cases
513            // [prune_count, last_block, expected_tx_tip, expected_file_count, expected_tx_index)
514            let test_cases = vec![
515                // Case 0: 20..=29 has only one tx. Prune the only tx of the block range.
516                // It ensures that the file is not deleted even though there are no rows, since the
517                // `last_block` which is passed to the prune method is the first
518                // block of the range.
519                (
520                    1,
521                    blocks_per_file * 2,
522                    Some(highest_tx - 1),
523                    initial_file_count,
524                    BTreeMap::from([(highest_tx - 1, SegmentRangeInclusive::new(0, 9))]),
525                ),
526                // Case 1: 10..=19 has no txs. There are no txes in the whole block range, but want
527                // to unwind to block 9. Ensures that the 20..=29 and 10..=19 files
528                // are deleted.
529                (
530                    0,
531                    blocks_per_file - 1,
532                    Some(highest_tx - 1),
533                    files_per_range,
534                    BTreeMap::from([(highest_tx - 1, SegmentRangeInclusive::new(0, 9))]),
535                ),
536                // Case 2: Prune most txs up to block 1.
537                (
538                    highest_tx - 1,
539                    1,
540                    Some(0),
541                    files_per_range,
542                    BTreeMap::from([(0, SegmentRangeInclusive::new(0, 1))]),
543                ),
544                // Case 3: Prune remaining tx and ensure that file is not deleted.
545                (1, 0, None, files_per_range, BTreeMap::from([])),
546            ];
547
548            // Loop through test cases
549            for (
550                case,
551                (prune_count, last_block, expected_tx_tip, expected_file_count, expected_tx_index),
552            ) in test_cases.into_iter().enumerate()
553            {
554                prune_and_validate(
555                    &sf_rw,
556                    &static_dir,
557                    segment,
558                    prune_count,
559                    last_block,
560                    expected_tx_tip,
561                    expected_file_count,
562                    expected_tx_index,
563                )
564                .map_err(|err| eyre::eyre!("Test case {case}: {err}"))
565                .unwrap();
566            }
567        }
568    }
569
570    /// Returns the number of files in the provided path, excluding ".lock" files.
571    fn count_files_without_lockfile(path: impl AsRef<Path>) -> eyre::Result<usize> {
572        let is_lockfile = |entry: &fs::DirEntry| {
573            entry.path().file_name().map(|name| name == "lock").unwrap_or(false)
574        };
575        let count = fs::read_dir(path)?
576            .filter_map(|entry| entry.ok())
577            .filter(|entry| !is_lockfile(entry))
578            .count();
579
580        Ok(count)
581    }
582
583    #[test]
584    fn test_dynamic_size() -> eyre::Result<()> {
585        let (static_dir, _) = create_test_static_files_dir();
586
587        {
588            let sf_rw = StaticFileProviderBuilder::<EthPrimitives>::read_write(&static_dir)?
589                .with_blocks_per_file(10)
590                .build()?;
591            let mut header_writer = sf_rw.latest_writer(StaticFileSegment::Headers)?;
592
593            let mut header = Header::default();
594            for num in 0..=15 {
595                header.number = num;
596                header_writer.append_header(&header, &BlockHash::default()).unwrap();
597            }
598            header_writer.commit().unwrap();
599
600            assert_eq!(sf_rw.headers_range(0..=15)?.len(), 16);
601            assert_eq!(
602                sf_rw.expected_block_index(StaticFileSegment::Headers),
603                Some(BTreeMap::from([
604                    (9, SegmentRangeInclusive::new(0, 9)),
605                    (19, SegmentRangeInclusive::new(10, 19))
606                ])),
607            )
608        }
609
610        {
611            let sf_rw = StaticFileProviderBuilder::<EthPrimitives>::read_write(&static_dir)?
612                .with_blocks_per_file(5)
613                .build()?;
614            let mut header_writer = sf_rw.latest_writer(StaticFileSegment::Headers)?;
615
616            let mut header = Header::default();
617            for num in 16..=22 {
618                header.number = num;
619                header_writer.append_header(&header, &BlockHash::default()).unwrap();
620            }
621            header_writer.commit().unwrap();
622
623            assert_eq!(sf_rw.headers_range(0..=22)?.len(), 23);
624            assert_eq!(
625                sf_rw.expected_block_index(StaticFileSegment::Headers),
626                Some(BTreeMap::from([
627                    (9, SegmentRangeInclusive::new(0, 9)),
628                    (19, SegmentRangeInclusive::new(10, 19)),
629                    (24, SegmentRangeInclusive::new(20, 24))
630                ]))
631            )
632        }
633
634        {
635            let sf_rw = StaticFileProviderBuilder::<EthPrimitives>::read_write(&static_dir)?
636                .with_blocks_per_file(15)
637                .build()?;
638            let mut header_writer = sf_rw.latest_writer(StaticFileSegment::Headers)?;
639
640            let mut header = Header::default();
641            for num in 23..=40 {
642                header.number = num;
643                header_writer.append_header(&header, &BlockHash::default()).unwrap();
644            }
645            header_writer.commit().unwrap();
646
647            assert_eq!(sf_rw.headers_range(0..=40)?.len(), 41);
648            assert_eq!(
649                sf_rw.expected_block_index(StaticFileSegment::Headers),
650                Some(BTreeMap::from([
651                    (9, SegmentRangeInclusive::new(0, 9)),
652                    (19, SegmentRangeInclusive::new(10, 19)),
653                    (24, SegmentRangeInclusive::new(20, 24)),
654                    (39, SegmentRangeInclusive::new(25, 39)),
655                    (54, SegmentRangeInclusive::new(40, 54))
656                ]))
657            )
658        }
659
660        Ok(())
661    }
662}