reth_nippy_jar/
lib.rs

1//! Immutable data store format.
2//!
3//! *Warning*: The `NippyJar` encoding format and its implementations are
4//! designed for storing and retrieving data internally. They are not hardened
5//! to safely read potentially malicious data.
6
7#![doc(
8    html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
9    html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
10    issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
11)]
12#![cfg_attr(not(test), warn(unused_crate_dependencies))]
13#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
14
15use memmap2::Mmap;
16use serde::{Deserialize, Serialize};
17use std::{
18    error::Error as StdError,
19    fs::File,
20    io::Read,
21    ops::Range,
22    path::{Path, PathBuf},
23};
24use tracing::*;
25
26/// Compression algorithms supported by `NippyJar`.
27pub mod compression;
28#[cfg(test)]
29use compression::Compression;
30use compression::Compressors;
31
32/// empty enum for backwards compatibility
33#[derive(Debug, Serialize, Deserialize)]
34#[cfg_attr(test, derive(PartialEq, Eq))]
35pub enum Functions {}
36
37/// empty enum for backwards compatibility
38#[derive(Debug, Serialize, Deserialize)]
39#[cfg_attr(test, derive(PartialEq, Eq))]
40pub enum InclusionFilters {}
41
42mod error;
43pub use error::NippyJarError;
44
45mod cursor;
46pub use cursor::NippyJarCursor;
47
48mod writer;
49pub use writer::NippyJarWriter;
50
51mod consistency;
52pub use consistency::NippyJarChecker;
53
54/// The version number of the Nippy Jar format.
55const NIPPY_JAR_VERSION: usize = 1;
56/// The file extension used for index files.
57const INDEX_FILE_EXTENSION: &str = "idx";
58/// The file extension used for offsets files.
59const OFFSETS_FILE_EXTENSION: &str = "off";
60/// The file extension used for configuration files.
61pub const CONFIG_FILE_EXTENSION: &str = "conf";
62
63/// A [`RefRow`] is a list of column value slices pointing to either an internal buffer or a
64/// memory-mapped file.
65type RefRow<'a> = Vec<&'a [u8]>;
66
67/// Alias type for a column value wrapped in `Result`.
68pub type ColumnResult<T> = Result<T, Box<dyn StdError + Send + Sync>>;
69
70/// A trait for the user-defined header of [`NippyJar`].
71pub trait NippyJarHeader:
72    Send + Sync + Serialize + for<'b> Deserialize<'b> + std::fmt::Debug + 'static
73{
74}
75
76// Blanket implementation for all types that implement the required traits.
77impl<T> NippyJarHeader for T where
78    T: Send + Sync + Serialize + for<'b> Deserialize<'b> + std::fmt::Debug + 'static
79{
80}
81
82/// `NippyJar` is a specialized storage format designed for immutable data.
83///
84/// Data is organized into a columnar format, enabling column-based compression. Data retrieval
85/// entails consulting an offset list and fetching the data from file via `mmap`.
86#[derive(Serialize, Deserialize)]
87#[cfg_attr(test, derive(PartialEq))]
88pub struct NippyJar<H = ()> {
89    /// The version of the `NippyJar` format.
90    version: usize,
91    /// User-defined header data.
92    /// Default: zero-sized unit type: no header data
93    user_header: H,
94    /// Number of data columns in the jar.
95    columns: usize,
96    /// Number of data rows in the jar.
97    rows: usize,
98    /// Optional compression algorithm applied to the data.
99    compressor: Option<Compressors>,
100    #[serde(skip)]
101    /// Optional field for backwards compatibility
102    filter: Option<InclusionFilters>,
103    #[serde(skip)]
104    /// Optional field for backwards compatibility
105    phf: Option<Functions>,
106    /// Maximum uncompressed row size of the set. This will enable decompression without any
107    /// resizing of the output buffer.
108    max_row_size: usize,
109    /// Data path for file. Supporting files will have a format `{path}.{extension}`.
110    #[serde(skip)]
111    path: PathBuf,
112}
113
114impl<H: NippyJarHeader> std::fmt::Debug for NippyJar<H> {
115    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116        f.debug_struct("NippyJar")
117            .field("version", &self.version)
118            .field("user_header", &self.user_header)
119            .field("rows", &self.rows)
120            .field("columns", &self.columns)
121            .field("compressor", &self.compressor)
122            .field("filter", &self.filter)
123            .field("phf", &self.phf)
124            .field("path", &self.path)
125            .field("max_row_size", &self.max_row_size)
126            .finish_non_exhaustive()
127    }
128}
129
130impl NippyJar<()> {
131    /// Creates a new [`NippyJar`] without an user-defined header data.
132    pub fn new_without_header(columns: usize, path: &Path) -> Self {
133        Self::new(columns, path, ())
134    }
135
136    /// Loads the file configuration and returns [`Self`] on a jar without user-defined header data.
137    pub fn load_without_header(path: &Path) -> Result<Self, NippyJarError> {
138        Self::load(path)
139    }
140}
141
142impl<H: NippyJarHeader> NippyJar<H> {
143    /// Creates a new [`NippyJar`] with a user-defined header data.
144    pub fn new(columns: usize, path: &Path, user_header: H) -> Self {
145        Self {
146            version: NIPPY_JAR_VERSION,
147            user_header,
148            columns,
149            rows: 0,
150            max_row_size: 0,
151            compressor: None,
152            filter: None,
153            phf: None,
154            path: path.to_path_buf(),
155        }
156    }
157
158    /// Adds [`compression::Zstd`] compression.
159    pub fn with_zstd(mut self, use_dict: bool, max_dict_size: usize) -> Self {
160        self.compressor =
161            Some(Compressors::Zstd(compression::Zstd::new(use_dict, max_dict_size, self.columns)));
162        self
163    }
164
165    /// Adds [`compression::Lz4`] compression.
166    pub fn with_lz4(mut self) -> Self {
167        self.compressor = Some(Compressors::Lz4(compression::Lz4::default()));
168        self
169    }
170
171    /// Gets a reference to the user header.
172    pub const fn user_header(&self) -> &H {
173        &self.user_header
174    }
175
176    /// Gets total columns in jar.
177    pub const fn columns(&self) -> usize {
178        self.columns
179    }
180
181    /// Gets total rows in jar.
182    pub const fn rows(&self) -> usize {
183        self.rows
184    }
185
186    /// Gets a reference to the compressor.
187    pub const fn compressor(&self) -> Option<&Compressors> {
188        self.compressor.as_ref()
189    }
190
191    /// Gets a mutable reference to the compressor.
192    pub const fn compressor_mut(&mut self) -> Option<&mut Compressors> {
193        self.compressor.as_mut()
194    }
195
196    /// Loads the file configuration and returns [`Self`].
197    ///
198    /// **The user must ensure the header type matches the one used during the jar's creation.**
199    pub fn load(path: &Path) -> Result<Self, NippyJarError> {
200        // Read [`Self`] located at the data file.
201        let config_path = path.with_extension(CONFIG_FILE_EXTENSION);
202        let config_file = File::open(&config_path)
203            .map_err(|err| reth_fs_util::FsPathError::open(err, config_path))?;
204
205        let mut obj = Self::load_from_reader(config_file)?;
206        obj.path = path.to_path_buf();
207        Ok(obj)
208    }
209
210    /// Deserializes an instance of [`Self`] from a [`Read`] type.
211    pub fn load_from_reader<R: Read>(reader: R) -> Result<Self, NippyJarError> {
212        Ok(bincode::deserialize_from(reader)?)
213    }
214
215    /// Returns the path for the data file
216    pub fn data_path(&self) -> &Path {
217        self.path.as_ref()
218    }
219
220    /// Returns the path for the index file
221    pub fn index_path(&self) -> PathBuf {
222        self.path.with_extension(INDEX_FILE_EXTENSION)
223    }
224
225    /// Returns the path for the offsets file
226    pub fn offsets_path(&self) -> PathBuf {
227        self.path.with_extension(OFFSETS_FILE_EXTENSION)
228    }
229
230    /// Returns the path for the config file
231    pub fn config_path(&self) -> PathBuf {
232        self.path.with_extension(CONFIG_FILE_EXTENSION)
233    }
234
235    /// Deletes from disk this [`NippyJar`] alongside every satellite file.
236    pub fn delete(self) -> Result<(), NippyJarError> {
237        // TODO(joshie): ensure consistency on unexpected shutdown
238
239        for path in
240            [self.data_path().into(), self.index_path(), self.offsets_path(), self.config_path()]
241        {
242            if path.exists() {
243                debug!(target: "nippy-jar", ?path, "Removing file.");
244                reth_fs_util::remove_file(path)?;
245            }
246        }
247
248        Ok(())
249    }
250
251    /// Returns a [`DataReader`] of the data and offset file
252    pub fn open_data_reader(&self) -> Result<DataReader, NippyJarError> {
253        DataReader::new(self.data_path())
254    }
255
256    /// Writes all necessary configuration to file.
257    fn freeze_config(&self) -> Result<(), NippyJarError> {
258        Ok(reth_fs_util::atomic_write_file(&self.config_path(), |file| {
259            bincode::serialize_into(file, &self)
260        })?)
261    }
262}
263
264#[cfg(test)]
265impl<H: NippyJarHeader> NippyJar<H> {
266    /// If required, prepares any compression algorithm to an early pass of the data.
267    pub fn prepare_compression(
268        &mut self,
269        columns: Vec<impl IntoIterator<Item = Vec<u8>>>,
270    ) -> Result<(), NippyJarError> {
271        // Makes any necessary preparations for the compressors
272        if let Some(compression) = &mut self.compressor {
273            debug!(target: "nippy-jar", columns=columns.len(), "Preparing compression.");
274            compression.prepare_compression(columns)?;
275        }
276        Ok(())
277    }
278
279    /// Writes all data and configuration to a file and the offset index to another.
280    pub fn freeze(
281        self,
282        columns: Vec<impl IntoIterator<Item = ColumnResult<Vec<u8>>>>,
283        total_rows: u64,
284    ) -> Result<Self, NippyJarError> {
285        self.check_before_freeze(&columns)?;
286
287        debug!(target: "nippy-jar", path=?self.data_path(), "Opening data file.");
288
289        // Creates the writer, data and offsets file
290        let mut writer = NippyJarWriter::new(self)?;
291
292        // Append rows to file while holding offsets in memory
293        writer.append_rows(columns, total_rows)?;
294
295        // Flushes configuration and offsets to disk
296        writer.commit()?;
297
298        debug!(target: "nippy-jar", ?writer, "Finished writing data.");
299
300        Ok(writer.into_jar())
301    }
302
303    /// Safety checks before creating and returning a [`File`] handle to write data to.
304    fn check_before_freeze(
305        &self,
306        columns: &[impl IntoIterator<Item = ColumnResult<Vec<u8>>>],
307    ) -> Result<(), NippyJarError> {
308        if columns.len() != self.columns {
309            return Err(NippyJarError::ColumnLenMismatch(self.columns, columns.len()))
310        }
311
312        if let Some(compression) = &self.compressor {
313            if !compression.is_ready() {
314                return Err(NippyJarError::CompressorNotReady)
315            }
316        }
317
318        Ok(())
319    }
320}
321
322/// Manages the reading of static file data using memory-mapped files.
323///
324/// Holds file and mmap descriptors of the data and offsets files of a `static_file`.
325#[derive(Debug)]
326pub struct DataReader {
327    /// Data file descriptor. Needs to be kept alive as long as `data_mmap` handle.
328    #[expect(dead_code)]
329    data_file: File,
330    /// Mmap handle for data.
331    data_mmap: Mmap,
332    /// Offset file descriptor. Needs to be kept alive as long as `offset_mmap` handle.
333    offset_file: File,
334    /// Mmap handle for offsets.
335    offset_mmap: Mmap,
336    /// Number of bytes that represent one offset.
337    offset_size: u8,
338}
339
340impl DataReader {
341    /// Reads the respective data and offsets file and returns [`DataReader`].
342    pub fn new(path: impl AsRef<Path>) -> Result<Self, NippyJarError> {
343        let data_file = File::open(path.as_ref())?;
344        // SAFETY: File is read-only and its descriptor is kept alive as long as the mmap handle.
345        let data_mmap = unsafe { Mmap::map(&data_file)? };
346
347        let offset_file = File::open(path.as_ref().with_extension(OFFSETS_FILE_EXTENSION))?;
348        // SAFETY: File is read-only and its descriptor is kept alive as long as the mmap handle.
349        let offset_mmap = unsafe { Mmap::map(&offset_file)? };
350
351        // First byte is the size of one offset in bytes
352        let offset_size = offset_mmap[0];
353
354        // Ensure that the size of an offset is at most 8 bytes.
355        if offset_size > 8 {
356            return Err(NippyJarError::OffsetSizeTooBig { offset_size })
357        } else if offset_size == 0 {
358            return Err(NippyJarError::OffsetSizeTooSmall { offset_size })
359        }
360
361        Ok(Self { data_file, data_mmap, offset_file, offset_size, offset_mmap })
362    }
363
364    /// Returns the offset for the requested data index
365    pub fn offset(&self, index: usize) -> Result<u64, NippyJarError> {
366        // + 1 represents the offset_len u8 which is in the beginning of the file
367        let from = index * self.offset_size as usize + 1;
368
369        self.offset_at(from)
370    }
371
372    /// Returns the offset for the requested data index starting from the end
373    pub fn reverse_offset(&self, index: usize) -> Result<u64, NippyJarError> {
374        let offsets_file_size = self.offset_file.metadata()?.len() as usize;
375
376        if offsets_file_size > 1 {
377            let from = offsets_file_size - self.offset_size as usize * (index + 1);
378
379            self.offset_at(from)
380        } else {
381            Ok(0)
382        }
383    }
384
385    /// Returns total number of offsets in the file.
386    /// The size of one offset is determined by the file itself.
387    pub fn offsets_count(&self) -> Result<usize, NippyJarError> {
388        Ok((self.offset_file.metadata()?.len().saturating_sub(1) / self.offset_size as u64)
389            as usize)
390    }
391
392    /// Reads one offset-sized (determined by the offset file) u64 at the provided index.
393    fn offset_at(&self, index: usize) -> Result<u64, NippyJarError> {
394        let mut buffer: [u8; 8] = [0; 8];
395
396        let offset_end = index.saturating_add(self.offset_size as usize);
397        if offset_end > self.offset_mmap.len() {
398            return Err(NippyJarError::OffsetOutOfBounds { index })
399        }
400
401        buffer[..self.offset_size as usize].copy_from_slice(&self.offset_mmap[index..offset_end]);
402        Ok(u64::from_le_bytes(buffer))
403    }
404
405    /// Returns number of bytes that represent one offset.
406    pub const fn offset_size(&self) -> u8 {
407        self.offset_size
408    }
409
410    /// Returns the underlying data as a slice of bytes for the provided range.
411    pub fn data(&self, range: Range<usize>) -> &[u8] {
412        &self.data_mmap[range]
413    }
414
415    /// Returns total size of data
416    pub fn size(&self) -> usize {
417        self.data_mmap.len()
418    }
419}
420
421#[cfg(test)]
422mod tests {
423    use super::*;
424    use compression::Compression;
425    use rand::{rngs::SmallRng, seq::SliceRandom, RngCore, SeedableRng};
426    use std::{fs::OpenOptions, io::Read};
427
428    type ColumnResults<T> = Vec<ColumnResult<T>>;
429    type ColumnValues = Vec<Vec<u8>>;
430
431    fn test_data(seed: Option<u64>) -> (ColumnValues, ColumnValues) {
432        let value_length = 32;
433        let num_rows = 100;
434
435        let mut vec: Vec<u8> = vec![0; value_length];
436        let mut rng = seed.map(SmallRng::seed_from_u64).unwrap_or_else(SmallRng::from_os_rng);
437
438        let mut entry_gen = || {
439            (0..num_rows)
440                .map(|_| {
441                    rng.fill_bytes(&mut vec[..]);
442                    vec.clone()
443                })
444                .collect()
445        };
446
447        (entry_gen(), entry_gen())
448    }
449
450    fn clone_with_result(col: &ColumnValues) -> ColumnResults<Vec<u8>> {
451        col.iter().map(|v| Ok(v.clone())).collect()
452    }
453
454    #[test]
455    fn test_config_serialization() {
456        let file = tempfile::NamedTempFile::new().unwrap();
457        let jar = NippyJar::new_without_header(23, file.path()).with_lz4();
458        jar.freeze_config().unwrap();
459
460        let mut config_file = OpenOptions::new().read(true).open(jar.config_path()).unwrap();
461        let config_file_len = config_file.metadata().unwrap().len();
462        assert_eq!(config_file_len, 37);
463
464        let mut buf = Vec::with_capacity(config_file_len as usize);
465        config_file.read_to_end(&mut buf).unwrap();
466
467        assert_eq!(
468            vec![
469                1, 0, 0, 0, 0, 0, 0, 0, 23, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0,
470                0, 0, 0, 0, 0, 0, 0, 0, 0, 0
471            ],
472            buf
473        );
474
475        let mut read_jar = bincode::deserialize_from::<_, NippyJar>(&buf[..]).unwrap();
476        // Path is not ser/de
477        read_jar.path = file.path().to_path_buf();
478        assert_eq!(jar, read_jar);
479    }
480
481    #[test]
482    fn test_zstd_with_dictionaries() {
483        let (col1, col2) = test_data(None);
484        let num_rows = col1.len() as u64;
485        let num_columns = 2;
486        let file_path = tempfile::NamedTempFile::new().unwrap();
487
488        let nippy = NippyJar::new_without_header(num_columns, file_path.path());
489        assert!(nippy.compressor().is_none());
490
491        let mut nippy =
492            NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(true, 5000);
493        assert!(nippy.compressor().is_some());
494
495        if let Some(Compressors::Zstd(zstd)) = &mut nippy.compressor_mut() {
496            assert!(matches!(zstd.compressors(), Err(NippyJarError::CompressorNotReady)));
497
498            // Make sure the number of column iterators match the initial set up ones.
499            assert!(matches!(
500                zstd.prepare_compression(vec![col1.clone(), col2.clone(), col2.clone()]),
501                Err(NippyJarError::ColumnLenMismatch(columns, 3)) if columns == num_columns
502            ));
503        }
504
505        // If ZSTD is enabled, do not write to the file unless the column dictionaries have been
506        // calculated.
507        assert!(matches!(
508            nippy.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows),
509            Err(NippyJarError::CompressorNotReady)
510        ));
511
512        let mut nippy =
513            NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(true, 5000);
514        assert!(nippy.compressor().is_some());
515
516        nippy.prepare_compression(vec![col1.clone(), col2.clone()]).unwrap();
517
518        if let Some(Compressors::Zstd(zstd)) = &nippy.compressor() {
519            assert!(matches!(
520                (&zstd.state, zstd.dictionaries.as_ref().map(|dict| dict.len())),
521                (compression::ZstdState::Ready, Some(columns)) if columns == num_columns
522            ));
523        }
524
525        let nippy = nippy
526            .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
527            .unwrap();
528
529        let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
530        assert_eq!(nippy.version, loaded_nippy.version);
531        assert_eq!(nippy.columns, loaded_nippy.columns);
532        assert_eq!(nippy.filter, loaded_nippy.filter);
533        assert_eq!(nippy.phf, loaded_nippy.phf);
534        assert_eq!(nippy.max_row_size, loaded_nippy.max_row_size);
535        assert_eq!(nippy.path, loaded_nippy.path);
536
537        if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() {
538            assert!(zstd.use_dict);
539            let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
540
541            // Iterate over compressed values and compare
542            let mut row_index = 0usize;
543            while let Some(row) = cursor.next_row().unwrap() {
544                assert_eq!(
545                    (row[0], row[1]),
546                    (col1[row_index].as_slice(), col2[row_index].as_slice())
547                );
548                row_index += 1;
549            }
550        } else {
551            panic!("Expected Zstd compressor")
552        }
553    }
554
555    #[test]
556    fn test_lz4() {
557        let (col1, col2) = test_data(None);
558        let num_rows = col1.len() as u64;
559        let num_columns = 2;
560        let file_path = tempfile::NamedTempFile::new().unwrap();
561
562        let nippy = NippyJar::new_without_header(num_columns, file_path.path());
563        assert!(nippy.compressor().is_none());
564
565        let nippy = NippyJar::new_without_header(num_columns, file_path.path()).with_lz4();
566        assert!(nippy.compressor().is_some());
567
568        let nippy = nippy
569            .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
570            .unwrap();
571
572        let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
573        assert_eq!(nippy, loaded_nippy);
574
575        if let Some(Compressors::Lz4(_)) = loaded_nippy.compressor() {
576            let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
577
578            // Iterate over compressed values and compare
579            let mut row_index = 0usize;
580            while let Some(row) = cursor.next_row().unwrap() {
581                assert_eq!(
582                    (row[0], row[1]),
583                    (col1[row_index].as_slice(), col2[row_index].as_slice())
584                );
585                row_index += 1;
586            }
587        } else {
588            panic!("Expected Lz4 compressor")
589        }
590    }
591
592    #[test]
593    fn test_zstd_no_dictionaries() {
594        let (col1, col2) = test_data(None);
595        let num_rows = col1.len() as u64;
596        let num_columns = 2;
597        let file_path = tempfile::NamedTempFile::new().unwrap();
598
599        let nippy = NippyJar::new_without_header(num_columns, file_path.path());
600        assert!(nippy.compressor().is_none());
601
602        let nippy =
603            NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(false, 5000);
604        assert!(nippy.compressor().is_some());
605
606        let nippy = nippy
607            .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
608            .unwrap();
609
610        let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
611        assert_eq!(nippy, loaded_nippy);
612
613        if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() {
614            assert!(!zstd.use_dict);
615
616            let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
617
618            // Iterate over compressed values and compare
619            let mut row_index = 0usize;
620            while let Some(row) = cursor.next_row().unwrap() {
621                assert_eq!(
622                    (row[0], row[1]),
623                    (col1[row_index].as_slice(), col2[row_index].as_slice())
624                );
625                row_index += 1;
626            }
627        } else {
628            panic!("Expected Zstd compressor")
629        }
630    }
631
632    /// Tests `NippyJar` with everything enabled.
633    #[test]
634    fn test_full_nippy_jar() {
635        let (col1, col2) = test_data(None);
636        let num_rows = col1.len() as u64;
637        let num_columns = 2;
638        let file_path = tempfile::NamedTempFile::new().unwrap();
639        let data = vec![col1.clone(), col2.clone()];
640
641        let block_start = 500;
642
643        #[derive(Serialize, Deserialize, Debug)]
644        struct BlockJarHeader {
645            block_start: usize,
646        }
647
648        // Create file
649        {
650            let mut nippy =
651                NippyJar::new(num_columns, file_path.path(), BlockJarHeader { block_start })
652                    .with_zstd(true, 5000);
653
654            nippy.prepare_compression(data.clone()).unwrap();
655            nippy
656                .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
657                .unwrap();
658        }
659
660        // Read file
661        {
662            let loaded_nippy = NippyJar::<BlockJarHeader>::load(file_path.path()).unwrap();
663
664            assert!(loaded_nippy.compressor().is_some());
665            assert_eq!(loaded_nippy.user_header().block_start, block_start);
666
667            if let Some(Compressors::Zstd(_zstd)) = loaded_nippy.compressor() {
668                let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
669
670                // Iterate over compressed values and compare
671                let mut row_num = 0usize;
672                while let Some(row) = cursor.next_row().unwrap() {
673                    assert_eq!(
674                        (row[0], row[1]),
675                        (data[0][row_num].as_slice(), data[1][row_num].as_slice())
676                    );
677                    row_num += 1;
678                }
679
680                // Shuffled for chaos.
681                let mut data = col1.iter().zip(col2.iter()).enumerate().collect::<Vec<_>>();
682                data.shuffle(&mut rand::rng());
683
684                for (row_num, (v0, v1)) in data {
685                    // Simulates `by_number` queries
686                    let row_by_num = cursor.row_by_number(row_num).unwrap().unwrap();
687                    assert_eq!((&row_by_num[0].to_vec(), &row_by_num[1].to_vec()), (v0, v1));
688                }
689            }
690        }
691    }
692
693    #[test]
694    fn test_selectable_column_values() {
695        let (col1, col2) = test_data(None);
696        let num_rows = col1.len() as u64;
697        let num_columns = 2;
698        let file_path = tempfile::NamedTempFile::new().unwrap();
699        let data = vec![col1.clone(), col2.clone()];
700
701        // Create file
702        {
703            let mut nippy =
704                NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(true, 5000);
705            nippy.prepare_compression(data).unwrap();
706            nippy
707                .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
708                .unwrap();
709        }
710
711        // Read file
712        {
713            let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
714
715            if let Some(Compressors::Zstd(_zstd)) = loaded_nippy.compressor() {
716                let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
717
718                // Shuffled for chaos.
719                let mut data = col1.iter().zip(col2.iter()).enumerate().collect::<Vec<_>>();
720                data.shuffle(&mut rand::rng());
721
722                // Imagine `Blocks` static file has two columns: `Block | StoredWithdrawals`
723                const BLOCKS_FULL_MASK: usize = 0b11;
724
725                // Read both columns
726                for (row_num, (v0, v1)) in &data {
727                    // Simulates `by_number` queries
728                    let row_by_num = cursor
729                        .row_by_number_with_cols(*row_num, BLOCKS_FULL_MASK)
730                        .unwrap()
731                        .unwrap();
732                    assert_eq!((&row_by_num[0].to_vec(), &row_by_num[1].to_vec()), (*v0, *v1));
733                }
734
735                // Read first column only: `Block`
736                const BLOCKS_BLOCK_MASK: usize = 0b01;
737                for (row_num, (v0, _)) in &data {
738                    // Simulates `by_number` queries
739                    let row_by_num = cursor
740                        .row_by_number_with_cols(*row_num, BLOCKS_BLOCK_MASK)
741                        .unwrap()
742                        .unwrap();
743                    assert_eq!(row_by_num.len(), 1);
744                    assert_eq!(&row_by_num[0].to_vec(), *v0);
745                }
746
747                // Read second column only: `Block`
748                const BLOCKS_WITHDRAWAL_MASK: usize = 0b10;
749                for (row_num, (_, v1)) in &data {
750                    // Simulates `by_number` queries
751                    let row_by_num = cursor
752                        .row_by_number_with_cols(*row_num, BLOCKS_WITHDRAWAL_MASK)
753                        .unwrap()
754                        .unwrap();
755                    assert_eq!(row_by_num.len(), 1);
756                    assert_eq!(&row_by_num[0].to_vec(), *v1);
757                }
758
759                // Read nothing
760                const BLOCKS_EMPTY_MASK: usize = 0b00;
761                for (row_num, _) in &data {
762                    // Simulates `by_number` queries
763                    assert!(cursor
764                        .row_by_number_with_cols(*row_num, BLOCKS_EMPTY_MASK)
765                        .unwrap()
766                        .unwrap()
767                        .is_empty());
768                }
769            }
770        }
771    }
772
773    #[test]
774    fn test_writer() {
775        let (col1, col2) = test_data(None);
776        let num_columns = 2;
777        let file_path = tempfile::NamedTempFile::new().unwrap();
778
779        append_two_rows(num_columns, file_path.path(), &col1, &col2);
780
781        // Appends a third row and prunes two rows, to make sure we prune from memory and disk
782        // offset list
783        prune_rows(num_columns, file_path.path(), &col1, &col2);
784
785        // Should be able to append new rows
786        append_two_rows(num_columns, file_path.path(), &col1, &col2);
787
788        // Simulate an unexpected shutdown before there's a chance to commit, and see that it
789        // unwinds successfully
790        test_append_consistency_no_commit(file_path.path(), &col1, &col2);
791
792        // Simulate an unexpected shutdown during commit, and see that it unwinds successfully
793        test_append_consistency_partial_commit(file_path.path(), &col1, &col2);
794    }
795
796    #[test]
797    fn test_pruner() {
798        let (col1, col2) = test_data(None);
799        let num_columns = 2;
800        let num_rows = 2;
801
802        // (missing_offsets, expected number of rows)
803        // If a row wasn't fully pruned, then it should clear it up as well
804        let missing_offsets_scenarios = [(1, 1), (2, 1), (3, 0)];
805
806        for (missing_offsets, expected_rows) in missing_offsets_scenarios {
807            let file_path = tempfile::NamedTempFile::new().unwrap();
808
809            append_two_rows(num_columns, file_path.path(), &col1, &col2);
810
811            simulate_interrupted_prune(num_columns, file_path.path(), num_rows, missing_offsets);
812
813            let nippy = NippyJar::load_without_header(file_path.path()).unwrap();
814            assert_eq!(nippy.rows, expected_rows);
815        }
816    }
817
818    fn test_append_consistency_partial_commit(
819        file_path: &Path,
820        col1: &[Vec<u8>],
821        col2: &[Vec<u8>],
822    ) {
823        let nippy = NippyJar::load_without_header(file_path).unwrap();
824
825        // Set the baseline that should be unwinded to
826        let initial_rows = nippy.rows;
827        let initial_data_size =
828            File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
829        let initial_offset_size =
830            File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize;
831        assert!(initial_data_size > 0);
832        assert!(initial_offset_size > 0);
833
834        // Appends a third row
835        let mut writer = NippyJarWriter::new(nippy).unwrap();
836        writer.append_column(Some(Ok(&col1[2]))).unwrap();
837        writer.append_column(Some(Ok(&col2[2]))).unwrap();
838
839        // Makes sure it doesn't write the last one offset (which is the expected file data size)
840        let _ = writer.offsets_mut().pop();
841
842        // `commit_offsets` is not a pub function. we call it here to simulate the shutdown before
843        // it can flush nippy.rows (config) to disk.
844        writer.commit_offsets().unwrap();
845
846        // Simulate an unexpected shutdown of the writer, before it can finish commit()
847        drop(writer);
848
849        let nippy = NippyJar::load_without_header(file_path).unwrap();
850        assert_eq!(initial_rows, nippy.rows);
851
852        // Data was written successfully
853        let new_data_size =
854            File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
855        assert_eq!(new_data_size, initial_data_size + col1[2].len() + col2[2].len());
856
857        // It should be + 16 (two columns were added), but there's a missing one (the one we pop)
858        assert_eq!(
859            initial_offset_size + 8,
860            File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize
861        );
862
863        // Writer will execute a consistency check and verify first that the offset list on disk
864        // doesn't match the nippy.rows, and prune it. Then, it will prune the data file
865        // accordingly as well.
866        let writer = NippyJarWriter::new(nippy).unwrap();
867        assert_eq!(initial_rows, writer.rows());
868        assert_eq!(
869            initial_offset_size,
870            File::open(writer.offsets_path()).unwrap().metadata().unwrap().len() as usize
871        );
872        assert_eq!(
873            initial_data_size,
874            File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize
875        );
876    }
877
878    fn test_append_consistency_no_commit(file_path: &Path, col1: &[Vec<u8>], col2: &[Vec<u8>]) {
879        let nippy = NippyJar::load_without_header(file_path).unwrap();
880
881        // Set the baseline that should be unwinded to
882        let initial_rows = nippy.rows;
883        let initial_data_size =
884            File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
885        let initial_offset_size =
886            File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize;
887        assert!(initial_data_size > 0);
888        assert!(initial_offset_size > 0);
889
890        // Appends a third row, so we have an offset list in memory, which is not flushed to disk,
891        // while the data has been.
892        let mut writer = NippyJarWriter::new(nippy).unwrap();
893        writer.append_column(Some(Ok(&col1[2]))).unwrap();
894        writer.append_column(Some(Ok(&col2[2]))).unwrap();
895
896        // Simulate an unexpected shutdown of the writer, before it can call commit()
897        drop(writer);
898
899        let nippy = NippyJar::load_without_header(file_path).unwrap();
900        assert_eq!(initial_rows, nippy.rows);
901
902        // Data was written successfully
903        let new_data_size =
904            File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
905        assert_eq!(new_data_size, initial_data_size + col1[2].len() + col2[2].len());
906
907        // Since offsets only get written on commit(), this remains the same
908        assert_eq!(
909            initial_offset_size,
910            File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize
911        );
912
913        // Writer will execute a consistency check and verify that the data file has more data than
914        // it should, and resets it to the last offset of the list (on disk here)
915        let writer = NippyJarWriter::new(nippy).unwrap();
916        assert_eq!(initial_rows, writer.rows());
917        assert_eq!(
918            initial_data_size,
919            File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize
920        );
921    }
922
923    fn append_two_rows(num_columns: usize, file_path: &Path, col1: &[Vec<u8>], col2: &[Vec<u8>]) {
924        // Create and add 1 row
925        {
926            let nippy = NippyJar::new_without_header(num_columns, file_path);
927            nippy.freeze_config().unwrap();
928            assert_eq!(nippy.max_row_size, 0);
929            assert_eq!(nippy.rows, 0);
930
931            let mut writer = NippyJarWriter::new(nippy).unwrap();
932            assert_eq!(writer.column(), 0);
933
934            writer.append_column(Some(Ok(&col1[0]))).unwrap();
935            assert_eq!(writer.column(), 1);
936            assert!(writer.is_dirty());
937
938            writer.append_column(Some(Ok(&col2[0]))).unwrap();
939            assert!(writer.is_dirty());
940
941            // Adding last column of a row resets writer and updates jar config
942            assert_eq!(writer.column(), 0);
943
944            // One offset per column + 1 offset at the end representing the expected file data size
945            assert_eq!(writer.offsets().len(), 3);
946            let expected_data_file_size = *writer.offsets().last().unwrap();
947            writer.commit().unwrap();
948            assert!(!writer.is_dirty());
949
950            assert_eq!(writer.max_row_size(), col1[0].len() + col2[0].len());
951            assert_eq!(writer.rows(), 1);
952            assert_eq!(
953                File::open(writer.offsets_path()).unwrap().metadata().unwrap().len(),
954                1 + num_columns as u64 * 8 + 8
955            );
956            assert_eq!(
957                File::open(writer.data_path()).unwrap().metadata().unwrap().len(),
958                expected_data_file_size
959            );
960        }
961
962        // Load and add 1 row
963        {
964            let nippy = NippyJar::load_without_header(file_path).unwrap();
965            // Check if it was committed successfully
966            assert_eq!(nippy.max_row_size, col1[0].len() + col2[0].len());
967            assert_eq!(nippy.rows, 1);
968
969            let mut writer = NippyJarWriter::new(nippy).unwrap();
970            assert_eq!(writer.column(), 0);
971
972            writer.append_column(Some(Ok(&col1[1]))).unwrap();
973            assert_eq!(writer.column(), 1);
974
975            writer.append_column(Some(Ok(&col2[1]))).unwrap();
976
977            // Adding last column of a row resets writer and updates jar config
978            assert_eq!(writer.column(), 0);
979
980            // One offset per column + 1 offset at the end representing the expected file data size
981            assert_eq!(writer.offsets().len(), 3);
982            let expected_data_file_size = *writer.offsets().last().unwrap();
983            writer.commit().unwrap();
984
985            assert_eq!(writer.max_row_size(), col1[0].len() + col2[0].len());
986            assert_eq!(writer.rows(), 2);
987            assert_eq!(
988                File::open(writer.offsets_path()).unwrap().metadata().unwrap().len(),
989                1 + writer.rows() as u64 * num_columns as u64 * 8 + 8
990            );
991            assert_eq!(
992                File::open(writer.data_path()).unwrap().metadata().unwrap().len(),
993                expected_data_file_size
994            );
995        }
996    }
997
998    fn prune_rows(num_columns: usize, file_path: &Path, col1: &[Vec<u8>], col2: &[Vec<u8>]) {
999        let nippy = NippyJar::load_without_header(file_path).unwrap();
1000        let mut writer = NippyJarWriter::new(nippy).unwrap();
1001
1002        // Appends a third row, so we have an offset list in memory, which is not flushed to disk
1003        writer.append_column(Some(Ok(&col1[2]))).unwrap();
1004        writer.append_column(Some(Ok(&col2[2]))).unwrap();
1005        assert!(writer.is_dirty());
1006
1007        // This should prune from the on-memory offset list and ondisk offset list
1008        writer.prune_rows(2).unwrap();
1009        assert_eq!(writer.rows(), 1);
1010
1011        assert_eq!(
1012            File::open(writer.offsets_path()).unwrap().metadata().unwrap().len(),
1013            1 + writer.rows() as u64 * num_columns as u64 * 8 + 8
1014        );
1015
1016        let expected_data_size = col1[0].len() + col2[0].len();
1017        assert_eq!(
1018            File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize,
1019            expected_data_size
1020        );
1021
1022        let nippy = NippyJar::load_without_header(file_path).unwrap();
1023        {
1024            let data_reader = nippy.open_data_reader().unwrap();
1025            // there are only two valid offsets. so index 2 actually represents the expected file
1026            // data size.
1027            assert_eq!(data_reader.offset(2).unwrap(), expected_data_size as u64);
1028        }
1029
1030        // This should prune from the ondisk offset list and clear the jar.
1031        let mut writer = NippyJarWriter::new(nippy).unwrap();
1032        writer.prune_rows(1).unwrap();
1033        assert!(writer.is_dirty());
1034
1035        assert_eq!(writer.rows(), 0);
1036        assert_eq!(writer.max_row_size(), 0);
1037        assert_eq!(File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize, 0);
1038        // Only the byte that indicates how many bytes per offset should be left
1039        assert_eq!(
1040            File::open(writer.offsets_path()).unwrap().metadata().unwrap().len() as usize,
1041            1
1042        );
1043        writer.commit().unwrap();
1044        assert!(!writer.is_dirty());
1045    }
1046
1047    fn simulate_interrupted_prune(
1048        num_columns: usize,
1049        file_path: &Path,
1050        num_rows: u64,
1051        missing_offsets: u64,
1052    ) {
1053        let nippy = NippyJar::load_without_header(file_path).unwrap();
1054        let reader = nippy.open_data_reader().unwrap();
1055        let offsets_file =
1056            OpenOptions::new().read(true).write(true).open(nippy.offsets_path()).unwrap();
1057        let offsets_len = 1 + num_rows * num_columns as u64 * 8 + 8;
1058        assert_eq!(offsets_len, offsets_file.metadata().unwrap().len());
1059
1060        let data_file = OpenOptions::new().read(true).write(true).open(nippy.data_path()).unwrap();
1061        let data_len = reader.reverse_offset(0).unwrap();
1062        assert_eq!(data_len, data_file.metadata().unwrap().len());
1063
1064        // each data column is 32 bytes long
1065        // by deleting from the data file, the `consistency_check` will go through both branches:
1066        //      when the offset list wasn't updated after clearing the data (data_len > last
1067        // offset).      fixing above, will lead to offset count not match the rows (*
1068        // columns) of the configuration file
1069        data_file.set_len(data_len - 32 * missing_offsets).unwrap();
1070
1071        // runs the consistency check.
1072        let _ = NippyJarWriter::new(nippy).unwrap();
1073    }
1074}