reth_provider/providers/static_file/
mod.rs

1mod manager;
2pub use manager::{StaticFileAccess, StaticFileProvider, StaticFileWriter};
3
4mod jar;
5pub use jar::StaticFileJarProvider;
6
7mod writer;
8pub use writer::{StaticFileProviderRW, StaticFileProviderRWRefMut};
9
10mod metrics;
11use reth_nippy_jar::NippyJar;
12use reth_static_file_types::{SegmentHeader, StaticFileSegment};
13use reth_storage_errors::provider::{ProviderError, ProviderResult};
14use std::{ops::Deref, sync::Arc};
15
16/// Alias type for each specific `NippyJar`.
17type LoadedJarRef<'a> = dashmap::mapref::one::Ref<'a, (u64, StaticFileSegment), LoadedJar>;
18
19/// Helper type to reuse an associated static file mmap handle on created cursors.
20#[derive(Debug)]
21pub struct LoadedJar {
22    jar: NippyJar<SegmentHeader>,
23    mmap_handle: Arc<reth_nippy_jar::DataReader>,
24}
25
26impl LoadedJar {
27    fn new(jar: NippyJar<SegmentHeader>) -> ProviderResult<Self> {
28        match jar.open_data_reader() {
29            Ok(data_reader) => {
30                let mmap_handle = Arc::new(data_reader);
31                Ok(Self { jar, mmap_handle })
32            }
33            Err(e) => Err(ProviderError::other(e)),
34        }
35    }
36
37    /// Returns a clone of the mmap handle that can be used to instantiate a cursor.
38    fn mmap_handle(&self) -> Arc<reth_nippy_jar::DataReader> {
39        self.mmap_handle.clone()
40    }
41
42    const fn segment(&self) -> StaticFileSegment {
43        self.jar.user_header().segment()
44    }
45}
46
47impl Deref for LoadedJar {
48    type Target = NippyJar<SegmentHeader>;
49    fn deref(&self) -> &Self::Target {
50        &self.jar
51    }
52}
53
54#[cfg(test)]
55mod tests {
56    use super::*;
57    use crate::{
58        test_utils::create_test_provider_factory, HeaderProvider, StaticFileProviderFactory,
59    };
60    use alloy_consensus::{Header, SignableTransaction, Transaction, TxLegacy};
61    use alloy_primitives::{BlockHash, Signature, TxNumber, B256, U256};
62    use rand::seq::SliceRandom;
63    use reth_db::test_utils::create_test_static_files_dir;
64    use reth_db_api::{
65        transaction::DbTxMut, CanonicalHeaders, HeaderNumbers, HeaderTerminalDifficulties, Headers,
66    };
67    use reth_ethereum_primitives::{EthPrimitives, Receipt, TransactionSigned};
68    use reth_static_file_types::{
69        find_fixed_range, SegmentRangeInclusive, DEFAULT_BLOCKS_PER_STATIC_FILE,
70    };
71    use reth_storage_api::{ReceiptProvider, TransactionsProvider};
72    use reth_testing_utils::generators::{self, random_header_range};
73    use std::{fmt::Debug, fs, ops::Range, path::Path};
74
75    fn assert_eyre<T: PartialEq + Debug>(got: T, expected: T, msg: &str) -> eyre::Result<()> {
76        if got != expected {
77            eyre::bail!("{msg} | got: {got:?} expected: {expected:?})");
78        }
79        Ok(())
80    }
81
82    #[test]
83    fn test_snap() {
84        // Ranges
85        let row_count = 100u64;
86        let range = 0..=(row_count - 1);
87
88        // Data sources
89        let factory = create_test_provider_factory();
90        let static_files_path = tempfile::tempdir().unwrap();
91        let static_file = static_files_path.path().join(
92            StaticFileSegment::Headers
93                .filename(&find_fixed_range(*range.end(), DEFAULT_BLOCKS_PER_STATIC_FILE)),
94        );
95
96        // Setup data
97        let mut headers = random_header_range(
98            &mut generators::rng(),
99            *range.start()..(*range.end() + 1),
100            B256::random(),
101        );
102
103        let mut provider_rw = factory.provider_rw().unwrap();
104        let tx = provider_rw.tx_mut();
105        let mut td = U256::ZERO;
106        for header in headers.clone() {
107            td += header.header().difficulty;
108            let hash = header.hash();
109
110            tx.put::<CanonicalHeaders>(header.number, hash).unwrap();
111            tx.put::<Headers>(header.number, header.clone_header()).unwrap();
112            tx.put::<HeaderTerminalDifficulties>(header.number, td.into()).unwrap();
113            tx.put::<HeaderNumbers>(hash, header.number).unwrap();
114        }
115        provider_rw.commit().unwrap();
116
117        // Create StaticFile
118        {
119            let manager = factory.static_file_provider();
120            let mut writer = manager.latest_writer(StaticFileSegment::Headers).unwrap();
121            let mut td = U256::ZERO;
122
123            for header in headers.clone() {
124                td += header.header().difficulty;
125                let hash = header.hash();
126                writer.append_header(&header.unseal(), td, &hash).unwrap();
127            }
128            writer.commit().unwrap();
129        }
130
131        // Use providers to query Header data and compare if it matches
132        {
133            let db_provider = factory.provider().unwrap();
134            let manager = db_provider.static_file_provider();
135            let jar_provider = manager
136                .get_segment_provider_from_block(StaticFileSegment::Headers, 0, Some(&static_file))
137                .unwrap();
138
139            assert!(!headers.is_empty());
140
141            // Shuffled for chaos.
142            headers.shuffle(&mut generators::rng());
143
144            for header in headers {
145                let header_hash = header.hash();
146                let header = header.unseal();
147
148                // Compare Header
149                assert_eq!(header, db_provider.header(&header_hash).unwrap().unwrap());
150                assert_eq!(header, jar_provider.header_by_number(header.number).unwrap().unwrap());
151
152                // Compare HeaderTerminalDifficulties
153                assert_eq!(
154                    db_provider.header_td(&header_hash).unwrap().unwrap(),
155                    jar_provider.header_td_by_number(header.number).unwrap().unwrap()
156                );
157            }
158        }
159    }
160
161    #[test]
162    fn test_header_truncation() {
163        let (static_dir, _) = create_test_static_files_dir();
164
165        let blocks_per_file = 10; // Number of headers per file
166        let files_per_range = 3; // Number of files per range (data/conf/offset files)
167        let file_set_count = 3; // Number of sets of files to create
168        let initial_file_count = files_per_range * file_set_count;
169        let tip = blocks_per_file * file_set_count - 1; // Initial highest block (29 in this case)
170
171        // [ Headers Creation and Commit ]
172        {
173            let sf_rw = StaticFileProvider::<EthPrimitives>::read_write(&static_dir)
174                .expect("Failed to create static file provider")
175                .with_custom_blocks_per_file(blocks_per_file);
176
177            let mut header_writer = sf_rw.latest_writer(StaticFileSegment::Headers).unwrap();
178
179            // Append headers from 0 to the tip (29) and commit
180            let mut header = Header::default();
181            for num in 0..=tip {
182                header.number = num;
183                header_writer
184                    .append_header(&header, U256::default(), &BlockHash::default())
185                    .unwrap();
186            }
187            header_writer.commit().unwrap();
188        }
189
190        // Helper function to prune headers and validate truncation results
191        fn prune_and_validate(
192            writer: &mut StaticFileProviderRWRefMut<'_, EthPrimitives>,
193            sf_rw: &StaticFileProvider<EthPrimitives>,
194            static_dir: impl AsRef<Path>,
195            prune_count: u64,
196            expected_tip: Option<u64>,
197            expected_file_count: u64,
198        ) -> eyre::Result<()> {
199            writer.prune_headers(prune_count)?;
200            writer.commit()?;
201
202            // Validate the highest block after pruning
203            assert_eyre(
204                sf_rw.get_highest_static_file_block(StaticFileSegment::Headers),
205                expected_tip,
206                "block mismatch",
207            )?;
208
209            if let Some(id) = expected_tip {
210                assert_eyre(
211                    sf_rw.header_by_number(id)?.map(|h| h.number),
212                    expected_tip,
213                    "header mismatch",
214                )?;
215            }
216
217            // Validate the number of files remaining in the directory
218            assert_eyre(
219                count_files_without_lockfile(static_dir)?,
220                expected_file_count as usize,
221                "file count mismatch",
222            )?;
223
224            Ok(())
225        }
226
227        // [ Test Cases ]
228        type PruneCount = u64;
229        type ExpectedTip = u64;
230        type ExpectedFileCount = u64;
231        let mut tmp_tip = tip;
232        let test_cases: Vec<(PruneCount, Option<ExpectedTip>, ExpectedFileCount)> = vec![
233            // Case 0: Pruning 1 header
234            {
235                tmp_tip -= 1;
236                (1, Some(tmp_tip), initial_file_count)
237            },
238            // Case 1: Pruning remaining rows from file should result in its deletion
239            {
240                tmp_tip -= blocks_per_file - 1;
241                (blocks_per_file - 1, Some(tmp_tip), initial_file_count - files_per_range)
242            },
243            // Case 2: Pruning more headers than a single file has (tip reduced by
244            // blocks_per_file + 1) should result in a file set deletion
245            {
246                tmp_tip -= blocks_per_file + 1;
247                (blocks_per_file + 1, Some(tmp_tip), initial_file_count - files_per_range * 2)
248            },
249            // Case 3: Pruning all remaining headers from the file except the genesis header
250            {
251                (
252                    tmp_tip,
253                    Some(0),         // Only genesis block remains
254                    files_per_range, // The file set with block 0 should remain
255                )
256            },
257            // Case 4: Pruning the genesis header (should not delete the file set with block 0)
258            {
259                (
260                    1,
261                    None,            // No blocks left
262                    files_per_range, // The file set with block 0 remains
263                )
264            },
265        ];
266
267        // Test cases execution
268        {
269            let sf_rw = StaticFileProvider::read_write(&static_dir)
270                .expect("Failed to create static file provider")
271                .with_custom_blocks_per_file(blocks_per_file);
272
273            assert_eq!(sf_rw.get_highest_static_file_block(StaticFileSegment::Headers), Some(tip));
274            assert_eq!(
275                count_files_without_lockfile(static_dir.as_ref()).unwrap(),
276                initial_file_count as usize
277            );
278
279            let mut header_writer = sf_rw.latest_writer(StaticFileSegment::Headers).unwrap();
280
281            for (case, (prune_count, expected_tip, expected_file_count)) in
282                test_cases.into_iter().enumerate()
283            {
284                prune_and_validate(
285                    &mut header_writer,
286                    &sf_rw,
287                    &static_dir,
288                    prune_count,
289                    expected_tip,
290                    expected_file_count,
291                )
292                .map_err(|err| eyre::eyre!("Test case {case}: {err}"))
293                .unwrap();
294            }
295        }
296    }
297
298    /// 3 block ranges are built
299    ///
300    /// for `blocks_per_file = 10`:
301    /// * `0..=9` : except genesis, every block has a tx/receipt
302    /// * `10..=19`: no txs/receipts
303    /// * `20..=29`: only one tx/receipt
304    fn setup_tx_based_scenario(
305        sf_rw: &StaticFileProvider<EthPrimitives>,
306        segment: StaticFileSegment,
307        blocks_per_file: u64,
308    ) {
309        fn setup_block_ranges(
310            writer: &mut StaticFileProviderRWRefMut<'_, EthPrimitives>,
311            sf_rw: &StaticFileProvider<EthPrimitives>,
312            segment: StaticFileSegment,
313            block_range: &Range<u64>,
314            mut tx_count: u64,
315            next_tx_num: &mut u64,
316        ) {
317            let mut receipt = Receipt::default();
318            let mut tx = TxLegacy::default();
319
320            for block in block_range.clone() {
321                writer.increment_block(block).unwrap();
322
323                // Append transaction/receipt if there's still a transaction count to append
324                if tx_count > 0 {
325                    if segment.is_receipts() {
326                        // Used as ID for validation
327                        receipt.cumulative_gas_used = *next_tx_num;
328                        writer.append_receipt(*next_tx_num, &receipt).unwrap();
329                    } else {
330                        // Used as ID for validation
331                        tx.nonce = *next_tx_num;
332                        let tx: TransactionSigned =
333                            tx.clone().into_signed(Signature::test_signature()).into();
334                        writer.append_transaction(*next_tx_num, &tx).unwrap();
335                    }
336                    *next_tx_num += 1;
337                    tx_count -= 1;
338                }
339            }
340            writer.commit().unwrap();
341
342            // Calculate expected values based on the range and transactions
343            let expected_block = block_range.end - 1;
344            let expected_tx = if tx_count == 0 { *next_tx_num - 1 } else { *next_tx_num };
345
346            // Perform assertions after processing the blocks
347            assert_eq!(sf_rw.get_highest_static_file_block(segment), Some(expected_block),);
348            assert_eq!(sf_rw.get_highest_static_file_tx(segment), Some(expected_tx),);
349        }
350
351        // Define the block ranges and transaction counts as vectors
352        let block_ranges = [
353            0..blocks_per_file,
354            blocks_per_file..blocks_per_file * 2,
355            blocks_per_file * 2..blocks_per_file * 3,
356        ];
357
358        let tx_counts = [
359            blocks_per_file - 1, // First range: tx per block except genesis
360            0,                   // Second range: no transactions
361            1,                   // Third range: 1 transaction in the second block
362        ];
363
364        let mut writer = sf_rw.latest_writer(segment).unwrap();
365        let mut next_tx_num = 0;
366
367        // Loop through setup scenarios
368        for (block_range, tx_count) in block_ranges.iter().zip(tx_counts.iter()) {
369            setup_block_ranges(
370                &mut writer,
371                sf_rw,
372                segment,
373                block_range,
374                *tx_count,
375                &mut next_tx_num,
376            );
377        }
378
379        // Ensure that scenario was properly setup
380        let expected_tx_ranges = vec![
381            Some(SegmentRangeInclusive::new(0, 8)),
382            None,
383            Some(SegmentRangeInclusive::new(9, 9)),
384        ];
385
386        block_ranges.iter().zip(expected_tx_ranges).for_each(|(block_range, expected_tx_range)| {
387            assert_eq!(
388                sf_rw
389                    .get_segment_provider_from_block(segment, block_range.start, None)
390                    .unwrap()
391                    .user_header()
392                    .tx_range(),
393                expected_tx_range.as_ref()
394            );
395        });
396
397        // Ensure transaction index
398        let tx_index = sf_rw.tx_index().read();
399        let expected_tx_index =
400            vec![(8, SegmentRangeInclusive::new(0, 9)), (9, SegmentRangeInclusive::new(20, 29))];
401        assert_eq!(
402            tx_index.get(&segment).map(|index| index.iter().map(|(k, v)| (*k, *v)).collect()),
403            (!expected_tx_index.is_empty()).then_some(expected_tx_index),
404            "tx index mismatch",
405        );
406    }
407
408    #[test]
409    fn test_tx_based_truncation() {
410        let segments = [StaticFileSegment::Transactions, StaticFileSegment::Receipts];
411        let blocks_per_file = 10; // Number of blocks per file
412        let files_per_range = 3; // Number of files per range (data/conf/offset files)
413        let file_set_count = 3; // Number of sets of files to create
414        let initial_file_count = files_per_range * file_set_count;
415
416        #[expect(clippy::too_many_arguments)]
417        fn prune_and_validate(
418            sf_rw: &StaticFileProvider<EthPrimitives>,
419            static_dir: impl AsRef<Path>,
420            segment: StaticFileSegment,
421            prune_count: u64,
422            last_block: u64,
423            expected_tx_tip: Option<u64>,
424            expected_file_count: i32,
425            expected_tx_index: Vec<(TxNumber, SegmentRangeInclusive)>,
426        ) -> eyre::Result<()> {
427            let mut writer = sf_rw.latest_writer(segment)?;
428
429            // Prune transactions or receipts based on the segment type
430            if segment.is_receipts() {
431                writer.prune_receipts(prune_count, last_block)?;
432            } else {
433                writer.prune_transactions(prune_count, last_block)?;
434            }
435            writer.commit()?;
436
437            // Verify the highest block and transaction tips
438            assert_eyre(
439                sf_rw.get_highest_static_file_block(segment),
440                Some(last_block),
441                "block mismatch",
442            )?;
443            assert_eyre(sf_rw.get_highest_static_file_tx(segment), expected_tx_tip, "tx mismatch")?;
444
445            // Verify that transactions and receipts are returned correctly. Uses
446            // cumulative_gas_used & nonce as ids.
447            if let Some(id) = expected_tx_tip {
448                if segment.is_receipts() {
449                    assert_eyre(
450                        expected_tx_tip,
451                        sf_rw.receipt(id)?.map(|r| r.cumulative_gas_used),
452                        "tx mismatch",
453                    )?;
454                } else {
455                    assert_eyre(
456                        expected_tx_tip,
457                        sf_rw.transaction_by_id(id)?.map(|t| t.nonce()),
458                        "tx mismatch",
459                    )?;
460                }
461            }
462
463            // Ensure the file count has reduced as expected
464            assert_eyre(
465                count_files_without_lockfile(static_dir)?,
466                expected_file_count as usize,
467                "file count mismatch",
468            )?;
469
470            // Ensure that the inner tx index (max_tx -> block range) is as expected
471            let tx_index = sf_rw.tx_index().read();
472            assert_eyre(
473                tx_index.get(&segment).map(|index| index.iter().map(|(k, v)| (*k, *v)).collect()),
474                (!expected_tx_index.is_empty()).then_some(expected_tx_index),
475                "tx index mismatch",
476            )?;
477
478            Ok(())
479        }
480
481        for segment in segments {
482            let (static_dir, _) = create_test_static_files_dir();
483
484            let sf_rw = StaticFileProvider::read_write(&static_dir)
485                .expect("Failed to create static file provider")
486                .with_custom_blocks_per_file(blocks_per_file);
487
488            setup_tx_based_scenario(&sf_rw, segment, blocks_per_file);
489
490            let sf_rw = StaticFileProvider::read_write(&static_dir)
491                .expect("Failed to create static file provider")
492                .with_custom_blocks_per_file(blocks_per_file);
493            let highest_tx = sf_rw.get_highest_static_file_tx(segment).unwrap();
494
495            // Test cases
496            // [prune_count, last_block, expected_tx_tip, expected_file_count, expected_tx_index)
497            let test_cases = vec![
498                // Case 0: 20..=29 has only one tx. Prune the only tx of the block range.
499                // It ensures that the file is not deleted even though there are no rows, since the
500                // `last_block` which is passed to the prune method is the first
501                // block of the range.
502                (
503                    1,
504                    blocks_per_file * 2,
505                    Some(highest_tx - 1),
506                    initial_file_count,
507                    vec![(highest_tx - 1, SegmentRangeInclusive::new(0, 9))],
508                ),
509                // Case 1: 10..=19 has no txs. There are no txes in the whole block range, but want
510                // to unwind to block 9. Ensures that the 20..=29 and 10..=19 files
511                // are deleted.
512                (
513                    0,
514                    blocks_per_file - 1,
515                    Some(highest_tx - 1),
516                    files_per_range,
517                    vec![(highest_tx - 1, SegmentRangeInclusive::new(0, 9))],
518                ),
519                // Case 2: Prune most txs up to block 1.
520                (
521                    highest_tx - 1,
522                    1,
523                    Some(0),
524                    files_per_range,
525                    vec![(0, SegmentRangeInclusive::new(0, 1))],
526                ),
527                // Case 3: Prune remaining tx and ensure that file is not deleted.
528                (1, 0, None, files_per_range, vec![]),
529            ];
530
531            // Loop through test cases
532            for (
533                case,
534                (prune_count, last_block, expected_tx_tip, expected_file_count, expected_tx_index),
535            ) in test_cases.into_iter().enumerate()
536            {
537                prune_and_validate(
538                    &sf_rw,
539                    &static_dir,
540                    segment,
541                    prune_count,
542                    last_block,
543                    expected_tx_tip,
544                    expected_file_count,
545                    expected_tx_index,
546                )
547                .map_err(|err| eyre::eyre!("Test case {case}: {err}"))
548                .unwrap();
549            }
550        }
551    }
552
553    /// Returns the number of files in the provided path, excluding ".lock" files.
554    fn count_files_without_lockfile(path: impl AsRef<Path>) -> eyre::Result<usize> {
555        let is_lockfile = |entry: &fs::DirEntry| {
556            entry.path().file_name().map(|name| name == "lock").unwrap_or(false)
557        };
558        let count = fs::read_dir(path)?
559            .filter_map(|entry| entry.ok())
560            .filter(|entry| !is_lockfile(entry))
561            .count();
562
563        Ok(count)
564    }
565}