reth_provider/providers/static_file/
mod.rs

1mod manager;
2pub use manager::{StaticFileAccess, StaticFileProvider, StaticFileWriter};
3
4mod jar;
5pub use jar::StaticFileJarProvider;
6
7mod writer;
8pub use writer::{StaticFileProviderRW, StaticFileProviderRWRefMut};
9
10mod metrics;
11use reth_nippy_jar::NippyJar;
12use reth_static_file_types::{SegmentHeader, StaticFileSegment};
13use reth_storage_errors::provider::{ProviderError, ProviderResult};
14use std::{ops::Deref, sync::Arc};
15
16/// Alias type for each specific `NippyJar`.
17type LoadedJarRef<'a> = dashmap::mapref::one::Ref<'a, (u64, StaticFileSegment), LoadedJar>;
18
19/// Helper type to reuse an associated static file mmap handle on created cursors.
20#[derive(Debug)]
21pub struct LoadedJar {
22    jar: NippyJar<SegmentHeader>,
23    mmap_handle: Arc<reth_nippy_jar::DataReader>,
24}
25
26impl LoadedJar {
27    fn new(jar: NippyJar<SegmentHeader>) -> ProviderResult<Self> {
28        match jar.open_data_reader() {
29            Ok(data_reader) => {
30                let mmap_handle = Arc::new(data_reader);
31                Ok(Self { jar, mmap_handle })
32            }
33            Err(e) => Err(ProviderError::other(e)),
34        }
35    }
36
37    /// Returns a clone of the mmap handle that can be used to instantiate a cursor.
38    fn mmap_handle(&self) -> Arc<reth_nippy_jar::DataReader> {
39        self.mmap_handle.clone()
40    }
41
42    const fn segment(&self) -> StaticFileSegment {
43        self.jar.user_header().segment()
44    }
45}
46
47impl Deref for LoadedJar {
48    type Target = NippyJar<SegmentHeader>;
49    fn deref(&self) -> &Self::Target {
50        &self.jar
51    }
52}
53
54#[cfg(test)]
55mod tests {
56    use super::*;
57    use crate::{
58        test_utils::create_test_provider_factory, HeaderProvider, StaticFileProviderFactory,
59    };
60    use alloy_consensus::{Header, SignableTransaction, Transaction, TxLegacy};
61    use alloy_primitives::{BlockHash, Signature, TxNumber, B256};
62    use rand::seq::SliceRandom;
63    use reth_db::test_utils::create_test_static_files_dir;
64    use reth_db_api::{transaction::DbTxMut, CanonicalHeaders, HeaderNumbers, Headers};
65    use reth_ethereum_primitives::{EthPrimitives, Receipt, TransactionSigned};
66    use reth_static_file_types::{
67        find_fixed_range, SegmentRangeInclusive, DEFAULT_BLOCKS_PER_STATIC_FILE,
68    };
69    use reth_storage_api::{ReceiptProvider, TransactionsProvider};
70    use reth_testing_utils::generators::{self, random_header_range};
71    use std::{fmt::Debug, fs, ops::Range, path::Path};
72
73    fn assert_eyre<T: PartialEq + Debug>(got: T, expected: T, msg: &str) -> eyre::Result<()> {
74        if got != expected {
75            eyre::bail!("{msg} | got: {got:?} expected: {expected:?}");
76        }
77        Ok(())
78    }
79
80    #[test]
81    fn test_snap() {
82        // Ranges
83        let row_count = 100u64;
84        let range = 0..=(row_count - 1);
85
86        // Data sources
87        let factory = create_test_provider_factory();
88        let static_files_path = tempfile::tempdir().unwrap();
89        let static_file = static_files_path.path().join(
90            StaticFileSegment::Headers
91                .filename(&find_fixed_range(*range.end(), DEFAULT_BLOCKS_PER_STATIC_FILE)),
92        );
93
94        // Setup data
95        let mut headers = random_header_range(
96            &mut generators::rng(),
97            *range.start()..(*range.end() + 1),
98            B256::random(),
99        );
100
101        let mut provider_rw = factory.provider_rw().unwrap();
102        let tx = provider_rw.tx_mut();
103        for header in headers.clone() {
104            let hash = header.hash();
105
106            tx.put::<CanonicalHeaders>(header.number, hash).unwrap();
107            tx.put::<Headers>(header.number, header.clone_header()).unwrap();
108            tx.put::<HeaderNumbers>(hash, header.number).unwrap();
109        }
110        provider_rw.commit().unwrap();
111
112        // Create StaticFile
113        {
114            let manager = factory.static_file_provider();
115            let mut writer = manager.latest_writer(StaticFileSegment::Headers).unwrap();
116
117            for header in headers.clone() {
118                let hash = header.hash();
119                writer.append_header(&header.unseal(), &hash).unwrap();
120            }
121            writer.commit().unwrap();
122        }
123
124        // Use providers to query Header data and compare if it matches
125        {
126            let db_provider = factory.provider().unwrap();
127            let manager = db_provider.static_file_provider();
128            let jar_provider = manager
129                .get_segment_provider_from_block(StaticFileSegment::Headers, 0, Some(&static_file))
130                .unwrap();
131
132            assert!(!headers.is_empty());
133
134            // Shuffled for chaos.
135            headers.shuffle(&mut generators::rng());
136
137            for header in headers {
138                let header_hash = header.hash();
139                let header = header.unseal();
140
141                // Compare Header
142                assert_eq!(header, db_provider.header(header_hash).unwrap().unwrap());
143                assert_eq!(header, jar_provider.header_by_number(header.number).unwrap().unwrap());
144            }
145        }
146    }
147
148    #[test]
149    fn test_header_truncation() {
150        let (static_dir, _) = create_test_static_files_dir();
151
152        let blocks_per_file = 10; // Number of headers per file
153        let files_per_range = 3; // Number of files per range (data/conf/offset files)
154        let file_set_count = 3; // Number of sets of files to create
155        let initial_file_count = files_per_range * file_set_count;
156        let tip = blocks_per_file * file_set_count - 1; // Initial highest block (29 in this case)
157
158        // [ Headers Creation and Commit ]
159        {
160            let sf_rw = StaticFileProvider::<EthPrimitives>::read_write(&static_dir)
161                .expect("Failed to create static file provider")
162                .with_custom_blocks_per_file(blocks_per_file);
163
164            let mut header_writer = sf_rw.latest_writer(StaticFileSegment::Headers).unwrap();
165
166            // Append headers from 0 to the tip (29) and commit
167            let mut header = Header::default();
168            for num in 0..=tip {
169                header.number = num;
170                header_writer.append_header(&header, &BlockHash::default()).unwrap();
171            }
172            header_writer.commit().unwrap();
173        }
174
175        // Helper function to prune headers and validate truncation results
176        fn prune_and_validate(
177            writer: &mut StaticFileProviderRWRefMut<'_, EthPrimitives>,
178            sf_rw: &StaticFileProvider<EthPrimitives>,
179            static_dir: impl AsRef<Path>,
180            prune_count: u64,
181            expected_tip: Option<u64>,
182            expected_file_count: u64,
183        ) -> eyre::Result<()> {
184            writer.prune_headers(prune_count)?;
185            writer.commit()?;
186
187            // Validate the highest block after pruning
188            assert_eyre(
189                sf_rw.get_highest_static_file_block(StaticFileSegment::Headers),
190                expected_tip,
191                "block mismatch",
192            )?;
193
194            if let Some(id) = expected_tip {
195                assert_eyre(
196                    sf_rw.header_by_number(id)?.map(|h| h.number),
197                    expected_tip,
198                    "header mismatch",
199                )?;
200            }
201
202            // Validate the number of files remaining in the directory
203            assert_eyre(
204                count_files_without_lockfile(static_dir)?,
205                expected_file_count as usize,
206                "file count mismatch",
207            )?;
208
209            Ok(())
210        }
211
212        // [ Test Cases ]
213        type PruneCount = u64;
214        type ExpectedTip = u64;
215        type ExpectedFileCount = u64;
216        let mut tmp_tip = tip;
217        let test_cases: Vec<(PruneCount, Option<ExpectedTip>, ExpectedFileCount)> = vec![
218            // Case 0: Pruning 1 header
219            {
220                tmp_tip -= 1;
221                (1, Some(tmp_tip), initial_file_count)
222            },
223            // Case 1: Pruning remaining rows from file should result in its deletion
224            {
225                tmp_tip -= blocks_per_file - 1;
226                (blocks_per_file - 1, Some(tmp_tip), initial_file_count - files_per_range)
227            },
228            // Case 2: Pruning more headers than a single file has (tip reduced by
229            // blocks_per_file + 1) should result in a file set deletion
230            {
231                tmp_tip -= blocks_per_file + 1;
232                (blocks_per_file + 1, Some(tmp_tip), initial_file_count - files_per_range * 2)
233            },
234            // Case 3: Pruning all remaining headers from the file except the genesis header
235            {
236                (
237                    tmp_tip,
238                    Some(0),         // Only genesis block remains
239                    files_per_range, // The file set with block 0 should remain
240                )
241            },
242            // Case 4: Pruning the genesis header (should not delete the file set with block 0)
243            {
244                (
245                    1,
246                    None,            // No blocks left
247                    files_per_range, // The file set with block 0 remains
248                )
249            },
250        ];
251
252        // Test cases execution
253        {
254            let sf_rw = StaticFileProvider::read_write(&static_dir)
255                .expect("Failed to create static file provider")
256                .with_custom_blocks_per_file(blocks_per_file);
257
258            assert_eq!(sf_rw.get_highest_static_file_block(StaticFileSegment::Headers), Some(tip));
259            assert_eq!(
260                count_files_without_lockfile(static_dir.as_ref()).unwrap(),
261                initial_file_count as usize
262            );
263
264            let mut header_writer = sf_rw.latest_writer(StaticFileSegment::Headers).unwrap();
265
266            for (case, (prune_count, expected_tip, expected_file_count)) in
267                test_cases.into_iter().enumerate()
268            {
269                prune_and_validate(
270                    &mut header_writer,
271                    &sf_rw,
272                    &static_dir,
273                    prune_count,
274                    expected_tip,
275                    expected_file_count,
276                )
277                .map_err(|err| eyre::eyre!("Test case {case}: {err}"))
278                .unwrap();
279            }
280        }
281    }
282
283    /// 3 block ranges are built
284    ///
285    /// for `blocks_per_file = 10`:
286    /// * `0..=9` : except genesis, every block has a tx/receipt
287    /// * `10..=19`: no txs/receipts
288    /// * `20..=29`: only one tx/receipt
289    fn setup_tx_based_scenario(
290        sf_rw: &StaticFileProvider<EthPrimitives>,
291        segment: StaticFileSegment,
292        blocks_per_file: u64,
293    ) {
294        fn setup_block_ranges(
295            writer: &mut StaticFileProviderRWRefMut<'_, EthPrimitives>,
296            sf_rw: &StaticFileProvider<EthPrimitives>,
297            segment: StaticFileSegment,
298            block_range: &Range<u64>,
299            mut tx_count: u64,
300            next_tx_num: &mut u64,
301        ) {
302            let mut receipt = Receipt::default();
303            let mut tx = TxLegacy::default();
304
305            for block in block_range.clone() {
306                writer.increment_block(block).unwrap();
307
308                // Append transaction/receipt if there's still a transaction count to append
309                if tx_count > 0 {
310                    if segment.is_receipts() {
311                        // Used as ID for validation
312                        receipt.cumulative_gas_used = *next_tx_num;
313                        writer.append_receipt(*next_tx_num, &receipt).unwrap();
314                    } else {
315                        // Used as ID for validation
316                        tx.nonce = *next_tx_num;
317                        let tx: TransactionSigned =
318                            tx.clone().into_signed(Signature::test_signature()).into();
319                        writer.append_transaction(*next_tx_num, &tx).unwrap();
320                    }
321                    *next_tx_num += 1;
322                    tx_count -= 1;
323                }
324            }
325            writer.commit().unwrap();
326
327            // Calculate expected values based on the range and transactions
328            let expected_block = block_range.end - 1;
329            let expected_tx = if tx_count == 0 { *next_tx_num - 1 } else { *next_tx_num };
330
331            // Perform assertions after processing the blocks
332            assert_eq!(sf_rw.get_highest_static_file_block(segment), Some(expected_block),);
333            assert_eq!(sf_rw.get_highest_static_file_tx(segment), Some(expected_tx),);
334        }
335
336        // Define the block ranges and transaction counts as vectors
337        let block_ranges = [
338            0..blocks_per_file,
339            blocks_per_file..blocks_per_file * 2,
340            blocks_per_file * 2..blocks_per_file * 3,
341        ];
342
343        let tx_counts = [
344            blocks_per_file - 1, // First range: tx per block except genesis
345            0,                   // Second range: no transactions
346            1,                   // Third range: 1 transaction in the second block
347        ];
348
349        let mut writer = sf_rw.latest_writer(segment).unwrap();
350        let mut next_tx_num = 0;
351
352        // Loop through setup scenarios
353        for (block_range, tx_count) in block_ranges.iter().zip(tx_counts.iter()) {
354            setup_block_ranges(
355                &mut writer,
356                sf_rw,
357                segment,
358                block_range,
359                *tx_count,
360                &mut next_tx_num,
361            );
362        }
363
364        // Ensure that scenario was properly setup
365        let expected_tx_ranges = vec![
366            Some(SegmentRangeInclusive::new(0, 8)),
367            None,
368            Some(SegmentRangeInclusive::new(9, 9)),
369        ];
370
371        block_ranges.iter().zip(expected_tx_ranges).for_each(|(block_range, expected_tx_range)| {
372            assert_eq!(
373                sf_rw
374                    .get_segment_provider_from_block(segment, block_range.start, None)
375                    .unwrap()
376                    .user_header()
377                    .tx_range(),
378                expected_tx_range.as_ref()
379            );
380        });
381
382        // Ensure transaction index
383        let tx_index = sf_rw.tx_index().read();
384        let expected_tx_index =
385            vec![(8, SegmentRangeInclusive::new(0, 9)), (9, SegmentRangeInclusive::new(20, 29))];
386        assert_eq!(
387            tx_index.get(&segment).map(|index| index.iter().map(|(k, v)| (*k, *v)).collect()),
388            (!expected_tx_index.is_empty()).then_some(expected_tx_index),
389            "tx index mismatch",
390        );
391    }
392
393    #[test]
394    fn test_tx_based_truncation() {
395        let segments = [StaticFileSegment::Transactions, StaticFileSegment::Receipts];
396        let blocks_per_file = 10; // Number of blocks per file
397        let files_per_range = 3; // Number of files per range (data/conf/offset files)
398        let file_set_count = 3; // Number of sets of files to create
399        let initial_file_count = files_per_range * file_set_count;
400
401        #[expect(clippy::too_many_arguments)]
402        fn prune_and_validate(
403            sf_rw: &StaticFileProvider<EthPrimitives>,
404            static_dir: impl AsRef<Path>,
405            segment: StaticFileSegment,
406            prune_count: u64,
407            last_block: u64,
408            expected_tx_tip: Option<u64>,
409            expected_file_count: i32,
410            expected_tx_index: Vec<(TxNumber, SegmentRangeInclusive)>,
411        ) -> eyre::Result<()> {
412            let mut writer = sf_rw.latest_writer(segment)?;
413
414            // Prune transactions or receipts based on the segment type
415            if segment.is_receipts() {
416                writer.prune_receipts(prune_count, last_block)?;
417            } else {
418                writer.prune_transactions(prune_count, last_block)?;
419            }
420            writer.commit()?;
421
422            // Verify the highest block and transaction tips
423            assert_eyre(
424                sf_rw.get_highest_static_file_block(segment),
425                Some(last_block),
426                "block mismatch",
427            )?;
428            assert_eyre(sf_rw.get_highest_static_file_tx(segment), expected_tx_tip, "tx mismatch")?;
429
430            // Verify that transactions and receipts are returned correctly. Uses
431            // cumulative_gas_used & nonce as ids.
432            if let Some(id) = expected_tx_tip {
433                if segment.is_receipts() {
434                    assert_eyre(
435                        expected_tx_tip,
436                        sf_rw.receipt(id)?.map(|r| r.cumulative_gas_used),
437                        "tx mismatch",
438                    )?;
439                } else {
440                    assert_eyre(
441                        expected_tx_tip,
442                        sf_rw.transaction_by_id(id)?.map(|t| t.nonce()),
443                        "tx mismatch",
444                    )?;
445                }
446            }
447
448            // Ensure the file count has reduced as expected
449            assert_eyre(
450                count_files_without_lockfile(static_dir)?,
451                expected_file_count as usize,
452                "file count mismatch",
453            )?;
454
455            // Ensure that the inner tx index (max_tx -> block range) is as expected
456            let tx_index = sf_rw.tx_index().read();
457            assert_eyre(
458                tx_index.get(&segment).map(|index| index.iter().map(|(k, v)| (*k, *v)).collect()),
459                (!expected_tx_index.is_empty()).then_some(expected_tx_index),
460                "tx index mismatch",
461            )?;
462
463            Ok(())
464        }
465
466        for segment in segments {
467            let (static_dir, _) = create_test_static_files_dir();
468
469            let sf_rw = StaticFileProvider::read_write(&static_dir)
470                .expect("Failed to create static file provider")
471                .with_custom_blocks_per_file(blocks_per_file);
472
473            setup_tx_based_scenario(&sf_rw, segment, blocks_per_file);
474
475            let sf_rw = StaticFileProvider::read_write(&static_dir)
476                .expect("Failed to create static file provider")
477                .with_custom_blocks_per_file(blocks_per_file);
478            let highest_tx = sf_rw.get_highest_static_file_tx(segment).unwrap();
479
480            // Test cases
481            // [prune_count, last_block, expected_tx_tip, expected_file_count, expected_tx_index)
482            let test_cases = vec![
483                // Case 0: 20..=29 has only one tx. Prune the only tx of the block range.
484                // It ensures that the file is not deleted even though there are no rows, since the
485                // `last_block` which is passed to the prune method is the first
486                // block of the range.
487                (
488                    1,
489                    blocks_per_file * 2,
490                    Some(highest_tx - 1),
491                    initial_file_count,
492                    vec![(highest_tx - 1, SegmentRangeInclusive::new(0, 9))],
493                ),
494                // Case 1: 10..=19 has no txs. There are no txes in the whole block range, but want
495                // to unwind to block 9. Ensures that the 20..=29 and 10..=19 files
496                // are deleted.
497                (
498                    0,
499                    blocks_per_file - 1,
500                    Some(highest_tx - 1),
501                    files_per_range,
502                    vec![(highest_tx - 1, SegmentRangeInclusive::new(0, 9))],
503                ),
504                // Case 2: Prune most txs up to block 1.
505                (
506                    highest_tx - 1,
507                    1,
508                    Some(0),
509                    files_per_range,
510                    vec![(0, SegmentRangeInclusive::new(0, 1))],
511                ),
512                // Case 3: Prune remaining tx and ensure that file is not deleted.
513                (1, 0, None, files_per_range, vec![]),
514            ];
515
516            // Loop through test cases
517            for (
518                case,
519                (prune_count, last_block, expected_tx_tip, expected_file_count, expected_tx_index),
520            ) in test_cases.into_iter().enumerate()
521            {
522                prune_and_validate(
523                    &sf_rw,
524                    &static_dir,
525                    segment,
526                    prune_count,
527                    last_block,
528                    expected_tx_tip,
529                    expected_file_count,
530                    expected_tx_index,
531                )
532                .map_err(|err| eyre::eyre!("Test case {case}: {err}"))
533                .unwrap();
534            }
535        }
536    }
537
538    /// Returns the number of files in the provided path, excluding ".lock" files.
539    fn count_files_without_lockfile(path: impl AsRef<Path>) -> eyre::Result<usize> {
540        let is_lockfile = |entry: &fs::DirEntry| {
541            entry.path().file_name().map(|name| name == "lock").unwrap_or(false)
542        };
543        let count = fs::read_dir(path)?
544            .filter_map(|entry| entry.ok())
545            .filter(|entry| !is_lockfile(entry))
546            .count();
547
548        Ok(count)
549    }
550}