reth_nippy_jar/
lib.rs

1//! Immutable data store format.
2//!
3//! *Warning*: The `NippyJar` encoding format and its implementations are
4//! designed for storing and retrieving data internally. They are not hardened
5//! to safely read potentially malicious data.
6
7#![doc(
8    html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
9    html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
10    issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
11)]
12#![cfg_attr(not(test), warn(unused_crate_dependencies))]
13#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
14
15use memmap2::Mmap;
16use serde::{Deserialize, Serialize};
17use std::{
18    error::Error as StdError,
19    fs::File,
20    io::Read,
21    ops::Range,
22    path::{Path, PathBuf},
23};
24use tracing::*;
25
26/// Compression algorithms supported by `NippyJar`.
27pub mod compression;
28#[cfg(test)]
29use compression::Compression;
30use compression::Compressors;
31
32/// empty enum for backwards compatibility
33#[derive(Debug, Serialize, Deserialize)]
34#[cfg_attr(test, derive(PartialEq, Eq))]
35pub enum Functions {}
36
37/// empty enum for backwards compatibility
38#[derive(Debug, Serialize, Deserialize)]
39#[cfg_attr(test, derive(PartialEq, Eq))]
40pub enum InclusionFilters {}
41
42mod error;
43pub use error::NippyJarError;
44
45mod cursor;
46pub use cursor::NippyJarCursor;
47
48mod writer;
49pub use writer::NippyJarWriter;
50
51mod consistency;
52pub use consistency::NippyJarChecker;
53
54/// The version number of the Nippy Jar format.
55const NIPPY_JAR_VERSION: usize = 1;
56/// The file extension used for index files.
57const INDEX_FILE_EXTENSION: &str = "idx";
58/// The file extension used for offsets files.
59const OFFSETS_FILE_EXTENSION: &str = "off";
60/// The file extension used for configuration files.
61pub const CONFIG_FILE_EXTENSION: &str = "conf";
62
63/// A [`RefRow`] is a list of column value slices pointing to either an internal buffer or a
64/// memory-mapped file.
65type RefRow<'a> = Vec<&'a [u8]>;
66
67/// Alias type for a column value wrapped in `Result`.
68pub type ColumnResult<T> = Result<T, Box<dyn StdError + Send + Sync>>;
69
70/// A trait for the user-defined header of [`NippyJar`].
71pub trait NippyJarHeader:
72    Send + Sync + Serialize + for<'b> Deserialize<'b> + std::fmt::Debug + 'static
73{
74}
75
76// Blanket implementation for all types that implement the required traits.
77impl<T> NippyJarHeader for T where
78    T: Send + Sync + Serialize + for<'b> Deserialize<'b> + std::fmt::Debug + 'static
79{
80}
81
82/// `NippyJar` is a specialized storage format designed for immutable data.
83///
84/// Data is organized into a columnar format, enabling column-based compression. Data retrieval
85/// entails consulting an offset list and fetching the data from file via `mmap`.
86#[derive(Serialize, Deserialize)]
87#[cfg_attr(test, derive(PartialEq))]
88pub struct NippyJar<H = ()> {
89    /// The version of the `NippyJar` format.
90    version: usize,
91    /// User-defined header data.
92    /// Default: zero-sized unit type: no header data
93    user_header: H,
94    /// Number of data columns in the jar.
95    columns: usize,
96    /// Number of data rows in the jar.
97    rows: usize,
98    /// Optional compression algorithm applied to the data.
99    compressor: Option<Compressors>,
100    #[serde(skip)]
101    /// Optional field for backwards compatibility
102    filter: Option<InclusionFilters>,
103    #[serde(skip)]
104    /// Optional field for backwards compatibility
105    phf: Option<Functions>,
106    /// Maximum uncompressed row size of the set. This will enable decompression without any
107    /// resizing of the output buffer.
108    max_row_size: usize,
109    /// Data path for file. Supporting files will have a format `{path}.{extension}`.
110    #[serde(skip)]
111    path: PathBuf,
112}
113
114impl<H: NippyJarHeader> std::fmt::Debug for NippyJar<H> {
115    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116        f.debug_struct("NippyJar")
117            .field("version", &self.version)
118            .field("user_header", &self.user_header)
119            .field("rows", &self.rows)
120            .field("columns", &self.columns)
121            .field("compressor", &self.compressor)
122            .field("filter", &self.filter)
123            .field("phf", &self.phf)
124            .field("path", &self.path)
125            .field("max_row_size", &self.max_row_size)
126            .finish_non_exhaustive()
127    }
128}
129
130impl NippyJar<()> {
131    /// Creates a new [`NippyJar`] without an user-defined header data.
132    pub fn new_without_header(columns: usize, path: &Path) -> Self {
133        Self::new(columns, path, ())
134    }
135
136    /// Loads the file configuration and returns [`Self`] on a jar without user-defined header data.
137    pub fn load_without_header(path: &Path) -> Result<Self, NippyJarError> {
138        Self::load(path)
139    }
140}
141
142impl<H: NippyJarHeader> NippyJar<H> {
143    /// Creates a new [`NippyJar`] with a user-defined header data.
144    pub fn new(columns: usize, path: &Path, user_header: H) -> Self {
145        Self {
146            version: NIPPY_JAR_VERSION,
147            user_header,
148            columns,
149            rows: 0,
150            max_row_size: 0,
151            compressor: None,
152            filter: None,
153            phf: None,
154            path: path.to_path_buf(),
155        }
156    }
157
158    /// Adds [`compression::Zstd`] compression.
159    pub fn with_zstd(mut self, use_dict: bool, max_dict_size: usize) -> Self {
160        self.compressor =
161            Some(Compressors::Zstd(compression::Zstd::new(use_dict, max_dict_size, self.columns)));
162        self
163    }
164
165    /// Adds [`compression::Lz4`] compression.
166    pub fn with_lz4(mut self) -> Self {
167        self.compressor = Some(Compressors::Lz4(compression::Lz4::default()));
168        self
169    }
170
171    /// Gets a reference to the user header.
172    pub const fn user_header(&self) -> &H {
173        &self.user_header
174    }
175
176    /// Gets total columns in jar.
177    pub const fn columns(&self) -> usize {
178        self.columns
179    }
180
181    /// Gets total rows in jar.
182    pub const fn rows(&self) -> usize {
183        self.rows
184    }
185
186    /// Gets a reference to the compressor.
187    pub const fn compressor(&self) -> Option<&Compressors> {
188        self.compressor.as_ref()
189    }
190
191    /// Gets a mutable reference to the compressor.
192    pub const fn compressor_mut(&mut self) -> Option<&mut Compressors> {
193        self.compressor.as_mut()
194    }
195
196    /// Loads the file configuration and returns [`Self`].
197    ///
198    /// **The user must ensure the header type matches the one used during the jar's creation.**
199    pub fn load(path: &Path) -> Result<Self, NippyJarError> {
200        // Read [`Self`] located at the data file.
201        let config_path = path.with_extension(CONFIG_FILE_EXTENSION);
202        let config_file = File::open(&config_path)
203            .map_err(|err| reth_fs_util::FsPathError::open(err, config_path))?;
204
205        let mut obj = Self::load_from_reader(config_file)?;
206        obj.path = path.to_path_buf();
207        Ok(obj)
208    }
209
210    /// Deserializes an instance of [`Self`] from a [`Read`] type.
211    pub fn load_from_reader<R: Read>(reader: R) -> Result<Self, NippyJarError> {
212        Ok(bincode::deserialize_from(reader)?)
213    }
214
215    /// Returns the path for the data file
216    pub fn data_path(&self) -> &Path {
217        self.path.as_ref()
218    }
219
220    /// Returns the path for the index file
221    pub fn index_path(&self) -> PathBuf {
222        self.path.with_extension(INDEX_FILE_EXTENSION)
223    }
224
225    /// Returns the path for the offsets file
226    pub fn offsets_path(&self) -> PathBuf {
227        self.path.with_extension(OFFSETS_FILE_EXTENSION)
228    }
229
230    /// Returns the path for the config file
231    pub fn config_path(&self) -> PathBuf {
232        self.path.with_extension(CONFIG_FILE_EXTENSION)
233    }
234
235    /// Deletes from disk this [`NippyJar`] alongside every satellite file.
236    pub fn delete(self) -> Result<(), NippyJarError> {
237        // TODO(joshie): ensure consistency on unexpected shutdown
238
239        for path in
240            [self.data_path().into(), self.index_path(), self.offsets_path(), self.config_path()]
241        {
242            if path.exists() {
243                reth_fs_util::remove_file(path)?;
244            }
245        }
246
247        Ok(())
248    }
249
250    /// Returns a [`DataReader`] of the data and offset file
251    pub fn open_data_reader(&self) -> Result<DataReader, NippyJarError> {
252        DataReader::new(self.data_path())
253    }
254
255    /// Writes all necessary configuration to file.
256    fn freeze_config(&self) -> Result<(), NippyJarError> {
257        Ok(reth_fs_util::atomic_write_file(&self.config_path(), |file| {
258            bincode::serialize_into(file, &self)
259        })?)
260    }
261}
262
263#[cfg(test)]
264impl<H: NippyJarHeader> NippyJar<H> {
265    /// If required, prepares any compression algorithm to an early pass of the data.
266    pub fn prepare_compression(
267        &mut self,
268        columns: Vec<impl IntoIterator<Item = Vec<u8>>>,
269    ) -> Result<(), NippyJarError> {
270        // Makes any necessary preparations for the compressors
271        if let Some(compression) = &mut self.compressor {
272            debug!(target: "nippy-jar", columns=columns.len(), "Preparing compression.");
273            compression.prepare_compression(columns)?;
274        }
275        Ok(())
276    }
277
278    /// Writes all data and configuration to a file and the offset index to another.
279    pub fn freeze(
280        self,
281        columns: Vec<impl IntoIterator<Item = ColumnResult<Vec<u8>>>>,
282        total_rows: u64,
283    ) -> Result<Self, NippyJarError> {
284        self.check_before_freeze(&columns)?;
285
286        debug!(target: "nippy-jar", path=?self.data_path(), "Opening data file.");
287
288        // Creates the writer, data and offsets file
289        let mut writer = NippyJarWriter::new(self)?;
290
291        // Append rows to file while holding offsets in memory
292        writer.append_rows(columns, total_rows)?;
293
294        // Flushes configuration and offsets to disk
295        writer.commit()?;
296
297        debug!(target: "nippy-jar", ?writer, "Finished writing data.");
298
299        Ok(writer.into_jar())
300    }
301
302    /// Safety checks before creating and returning a [`File`] handle to write data to.
303    fn check_before_freeze(
304        &self,
305        columns: &[impl IntoIterator<Item = ColumnResult<Vec<u8>>>],
306    ) -> Result<(), NippyJarError> {
307        if columns.len() != self.columns {
308            return Err(NippyJarError::ColumnLenMismatch(self.columns, columns.len()))
309        }
310
311        if let Some(compression) = &self.compressor {
312            if !compression.is_ready() {
313                return Err(NippyJarError::CompressorNotReady)
314            }
315        }
316
317        Ok(())
318    }
319}
320
321/// Manages the reading of static file data using memory-mapped files.
322///
323/// Holds file and mmap descriptors of the data and offsets files of a `static_file`.
324#[derive(Debug)]
325pub struct DataReader {
326    /// Data file descriptor. Needs to be kept alive as long as `data_mmap` handle.
327    #[expect(dead_code)]
328    data_file: File,
329    /// Mmap handle for data.
330    data_mmap: Mmap,
331    /// Offset file descriptor. Needs to be kept alive as long as `offset_mmap` handle.
332    offset_file: File,
333    /// Mmap handle for offsets.
334    offset_mmap: Mmap,
335    /// Number of bytes that represent one offset.
336    offset_size: u8,
337}
338
339impl DataReader {
340    /// Reads the respective data and offsets file and returns [`DataReader`].
341    pub fn new(path: impl AsRef<Path>) -> Result<Self, NippyJarError> {
342        let data_file = File::open(path.as_ref())?;
343        // SAFETY: File is read-only and its descriptor is kept alive as long as the mmap handle.
344        let data_mmap = unsafe { Mmap::map(&data_file)? };
345
346        let offset_file = File::open(path.as_ref().with_extension(OFFSETS_FILE_EXTENSION))?;
347        // SAFETY: File is read-only and its descriptor is kept alive as long as the mmap handle.
348        let offset_mmap = unsafe { Mmap::map(&offset_file)? };
349
350        // First byte is the size of one offset in bytes
351        let offset_size = offset_mmap[0];
352
353        // Ensure that the size of an offset is at most 8 bytes.
354        if offset_size > 8 {
355            return Err(NippyJarError::OffsetSizeTooBig { offset_size })
356        } else if offset_size == 0 {
357            return Err(NippyJarError::OffsetSizeTooSmall { offset_size })
358        }
359
360        Ok(Self { data_file, data_mmap, offset_file, offset_size, offset_mmap })
361    }
362
363    /// Returns the offset for the requested data index
364    pub fn offset(&self, index: usize) -> Result<u64, NippyJarError> {
365        // + 1 represents the offset_len u8 which is in the beginning of the file
366        let from = index * self.offset_size as usize + 1;
367
368        self.offset_at(from)
369    }
370
371    /// Returns the offset for the requested data index starting from the end
372    pub fn reverse_offset(&self, index: usize) -> Result<u64, NippyJarError> {
373        let offsets_file_size = self.offset_file.metadata()?.len() as usize;
374
375        if offsets_file_size > 1 {
376            let from = offsets_file_size - self.offset_size as usize * (index + 1);
377
378            self.offset_at(from)
379        } else {
380            Ok(0)
381        }
382    }
383
384    /// Returns total number of offsets in the file.
385    /// The size of one offset is determined by the file itself.
386    pub fn offsets_count(&self) -> Result<usize, NippyJarError> {
387        Ok((self.offset_file.metadata()?.len().saturating_sub(1) / self.offset_size as u64)
388            as usize)
389    }
390
391    /// Reads one offset-sized (determined by the offset file) u64 at the provided index.
392    fn offset_at(&self, index: usize) -> Result<u64, NippyJarError> {
393        let mut buffer: [u8; 8] = [0; 8];
394
395        let offset_end = index.saturating_add(self.offset_size as usize);
396        if offset_end > self.offset_mmap.len() {
397            return Err(NippyJarError::OffsetOutOfBounds { index })
398        }
399
400        buffer[..self.offset_size as usize].copy_from_slice(&self.offset_mmap[index..offset_end]);
401        Ok(u64::from_le_bytes(buffer))
402    }
403
404    /// Returns number of bytes that represent one offset.
405    pub const fn offset_size(&self) -> u8 {
406        self.offset_size
407    }
408
409    /// Returns the underlying data as a slice of bytes for the provided range.
410    pub fn data(&self, range: Range<usize>) -> &[u8] {
411        &self.data_mmap[range]
412    }
413
414    /// Returns total size of data
415    pub fn size(&self) -> usize {
416        self.data_mmap.len()
417    }
418}
419
420#[cfg(test)]
421mod tests {
422    use super::*;
423    use compression::Compression;
424    use rand::{rngs::SmallRng, seq::SliceRandom, RngCore, SeedableRng};
425    use std::{fs::OpenOptions, io::Read};
426
427    type ColumnResults<T> = Vec<ColumnResult<T>>;
428    type ColumnValues = Vec<Vec<u8>>;
429
430    fn test_data(seed: Option<u64>) -> (ColumnValues, ColumnValues) {
431        let value_length = 32;
432        let num_rows = 100;
433
434        let mut vec: Vec<u8> = vec![0; value_length];
435        let mut rng = seed.map(SmallRng::seed_from_u64).unwrap_or_else(SmallRng::from_os_rng);
436
437        let mut entry_gen = || {
438            (0..num_rows)
439                .map(|_| {
440                    rng.fill_bytes(&mut vec[..]);
441                    vec.clone()
442                })
443                .collect()
444        };
445
446        (entry_gen(), entry_gen())
447    }
448
449    fn clone_with_result(col: &ColumnValues) -> ColumnResults<Vec<u8>> {
450        col.iter().map(|v| Ok(v.clone())).collect()
451    }
452
453    #[test]
454    fn test_config_serialization() {
455        let file = tempfile::NamedTempFile::new().unwrap();
456        let jar = NippyJar::new_without_header(23, file.path()).with_lz4();
457        jar.freeze_config().unwrap();
458
459        let mut config_file = OpenOptions::new().read(true).open(jar.config_path()).unwrap();
460        let config_file_len = config_file.metadata().unwrap().len();
461        assert_eq!(config_file_len, 37);
462
463        let mut buf = Vec::with_capacity(config_file_len as usize);
464        config_file.read_to_end(&mut buf).unwrap();
465
466        assert_eq!(
467            vec![
468                1, 0, 0, 0, 0, 0, 0, 0, 23, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0,
469                0, 0, 0, 0, 0, 0, 0, 0, 0, 0
470            ],
471            buf
472        );
473
474        let mut read_jar = bincode::deserialize_from::<_, NippyJar>(&buf[..]).unwrap();
475        // Path is not ser/de
476        read_jar.path = file.path().to_path_buf();
477        assert_eq!(jar, read_jar);
478    }
479
480    #[test]
481    fn test_zstd_with_dictionaries() {
482        let (col1, col2) = test_data(None);
483        let num_rows = col1.len() as u64;
484        let num_columns = 2;
485        let file_path = tempfile::NamedTempFile::new().unwrap();
486
487        let nippy = NippyJar::new_without_header(num_columns, file_path.path());
488        assert!(nippy.compressor().is_none());
489
490        let mut nippy =
491            NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(true, 5000);
492        assert!(nippy.compressor().is_some());
493
494        if let Some(Compressors::Zstd(zstd)) = &mut nippy.compressor_mut() {
495            assert!(matches!(zstd.compressors(), Err(NippyJarError::CompressorNotReady)));
496
497            // Make sure the number of column iterators match the initial set up ones.
498            assert!(matches!(
499                zstd.prepare_compression(vec![col1.clone(), col2.clone(), col2.clone()]),
500                Err(NippyJarError::ColumnLenMismatch(columns, 3)) if columns == num_columns
501            ));
502        }
503
504        // If ZSTD is enabled, do not write to the file unless the column dictionaries have been
505        // calculated.
506        assert!(matches!(
507            nippy.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows),
508            Err(NippyJarError::CompressorNotReady)
509        ));
510
511        let mut nippy =
512            NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(true, 5000);
513        assert!(nippy.compressor().is_some());
514
515        nippy.prepare_compression(vec![col1.clone(), col2.clone()]).unwrap();
516
517        if let Some(Compressors::Zstd(zstd)) = &nippy.compressor() {
518            assert!(matches!(
519                (&zstd.state, zstd.dictionaries.as_ref().map(|dict| dict.len())),
520                (compression::ZstdState::Ready, Some(columns)) if columns == num_columns
521            ));
522        }
523
524        let nippy = nippy
525            .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
526            .unwrap();
527
528        let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
529        assert_eq!(nippy.version, loaded_nippy.version);
530        assert_eq!(nippy.columns, loaded_nippy.columns);
531        assert_eq!(nippy.filter, loaded_nippy.filter);
532        assert_eq!(nippy.phf, loaded_nippy.phf);
533        assert_eq!(nippy.max_row_size, loaded_nippy.max_row_size);
534        assert_eq!(nippy.path, loaded_nippy.path);
535
536        if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() {
537            assert!(zstd.use_dict);
538            let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
539
540            // Iterate over compressed values and compare
541            let mut row_index = 0usize;
542            while let Some(row) = cursor.next_row().unwrap() {
543                assert_eq!(
544                    (row[0], row[1]),
545                    (col1[row_index].as_slice(), col2[row_index].as_slice())
546                );
547                row_index += 1;
548            }
549        } else {
550            panic!("Expected Zstd compressor")
551        }
552    }
553
554    #[test]
555    fn test_lz4() {
556        let (col1, col2) = test_data(None);
557        let num_rows = col1.len() as u64;
558        let num_columns = 2;
559        let file_path = tempfile::NamedTempFile::new().unwrap();
560
561        let nippy = NippyJar::new_without_header(num_columns, file_path.path());
562        assert!(nippy.compressor().is_none());
563
564        let nippy = NippyJar::new_without_header(num_columns, file_path.path()).with_lz4();
565        assert!(nippy.compressor().is_some());
566
567        let nippy = nippy
568            .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
569            .unwrap();
570
571        let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
572        assert_eq!(nippy, loaded_nippy);
573
574        if let Some(Compressors::Lz4(_)) = loaded_nippy.compressor() {
575            let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
576
577            // Iterate over compressed values and compare
578            let mut row_index = 0usize;
579            while let Some(row) = cursor.next_row().unwrap() {
580                assert_eq!(
581                    (row[0], row[1]),
582                    (col1[row_index].as_slice(), col2[row_index].as_slice())
583                );
584                row_index += 1;
585            }
586        } else {
587            panic!("Expected Lz4 compressor")
588        }
589    }
590
591    #[test]
592    fn test_zstd_no_dictionaries() {
593        let (col1, col2) = test_data(None);
594        let num_rows = col1.len() as u64;
595        let num_columns = 2;
596        let file_path = tempfile::NamedTempFile::new().unwrap();
597
598        let nippy = NippyJar::new_without_header(num_columns, file_path.path());
599        assert!(nippy.compressor().is_none());
600
601        let nippy =
602            NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(false, 5000);
603        assert!(nippy.compressor().is_some());
604
605        let nippy = nippy
606            .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
607            .unwrap();
608
609        let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
610        assert_eq!(nippy, loaded_nippy);
611
612        if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() {
613            assert!(!zstd.use_dict);
614
615            let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
616
617            // Iterate over compressed values and compare
618            let mut row_index = 0usize;
619            while let Some(row) = cursor.next_row().unwrap() {
620                assert_eq!(
621                    (row[0], row[1]),
622                    (col1[row_index].as_slice(), col2[row_index].as_slice())
623                );
624                row_index += 1;
625            }
626        } else {
627            panic!("Expected Zstd compressor")
628        }
629    }
630
631    /// Tests `NippyJar` with everything enabled.
632    #[test]
633    fn test_full_nippy_jar() {
634        let (col1, col2) = test_data(None);
635        let num_rows = col1.len() as u64;
636        let num_columns = 2;
637        let file_path = tempfile::NamedTempFile::new().unwrap();
638        let data = vec![col1.clone(), col2.clone()];
639
640        let block_start = 500;
641
642        #[derive(Serialize, Deserialize, Debug)]
643        struct BlockJarHeader {
644            block_start: usize,
645        }
646
647        // Create file
648        {
649            let mut nippy =
650                NippyJar::new(num_columns, file_path.path(), BlockJarHeader { block_start })
651                    .with_zstd(true, 5000);
652
653            nippy.prepare_compression(data.clone()).unwrap();
654            nippy
655                .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
656                .unwrap();
657        }
658
659        // Read file
660        {
661            let loaded_nippy = NippyJar::<BlockJarHeader>::load(file_path.path()).unwrap();
662
663            assert!(loaded_nippy.compressor().is_some());
664            assert_eq!(loaded_nippy.user_header().block_start, block_start);
665
666            if let Some(Compressors::Zstd(_zstd)) = loaded_nippy.compressor() {
667                let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
668
669                // Iterate over compressed values and compare
670                let mut row_num = 0usize;
671                while let Some(row) = cursor.next_row().unwrap() {
672                    assert_eq!(
673                        (row[0], row[1]),
674                        (data[0][row_num].as_slice(), data[1][row_num].as_slice())
675                    );
676                    row_num += 1;
677                }
678
679                // Shuffled for chaos.
680                let mut data = col1.iter().zip(col2.iter()).enumerate().collect::<Vec<_>>();
681                data.shuffle(&mut rand::rng());
682
683                for (row_num, (v0, v1)) in data {
684                    // Simulates `by_number` queries
685                    let row_by_num = cursor.row_by_number(row_num).unwrap().unwrap();
686                    assert_eq!((&row_by_num[0].to_vec(), &row_by_num[1].to_vec()), (v0, v1));
687                }
688            }
689        }
690    }
691
692    #[test]
693    fn test_selectable_column_values() {
694        let (col1, col2) = test_data(None);
695        let num_rows = col1.len() as u64;
696        let num_columns = 2;
697        let file_path = tempfile::NamedTempFile::new().unwrap();
698        let data = vec![col1.clone(), col2.clone()];
699
700        // Create file
701        {
702            let mut nippy =
703                NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(true, 5000);
704            nippy.prepare_compression(data).unwrap();
705            nippy
706                .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
707                .unwrap();
708        }
709
710        // Read file
711        {
712            let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
713
714            if let Some(Compressors::Zstd(_zstd)) = loaded_nippy.compressor() {
715                let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
716
717                // Shuffled for chaos.
718                let mut data = col1.iter().zip(col2.iter()).enumerate().collect::<Vec<_>>();
719                data.shuffle(&mut rand::rng());
720
721                // Imagine `Blocks` static file has two columns: `Block | StoredWithdrawals`
722                const BLOCKS_FULL_MASK: usize = 0b11;
723
724                // Read both columns
725                for (row_num, (v0, v1)) in &data {
726                    // Simulates `by_number` queries
727                    let row_by_num = cursor
728                        .row_by_number_with_cols(*row_num, BLOCKS_FULL_MASK)
729                        .unwrap()
730                        .unwrap();
731                    assert_eq!((&row_by_num[0].to_vec(), &row_by_num[1].to_vec()), (*v0, *v1));
732                }
733
734                // Read first column only: `Block`
735                const BLOCKS_BLOCK_MASK: usize = 0b01;
736                for (row_num, (v0, _)) in &data {
737                    // Simulates `by_number` queries
738                    let row_by_num = cursor
739                        .row_by_number_with_cols(*row_num, BLOCKS_BLOCK_MASK)
740                        .unwrap()
741                        .unwrap();
742                    assert_eq!(row_by_num.len(), 1);
743                    assert_eq!(&row_by_num[0].to_vec(), *v0);
744                }
745
746                // Read second column only: `Block`
747                const BLOCKS_WITHDRAWAL_MASK: usize = 0b10;
748                for (row_num, (_, v1)) in &data {
749                    // Simulates `by_number` queries
750                    let row_by_num = cursor
751                        .row_by_number_with_cols(*row_num, BLOCKS_WITHDRAWAL_MASK)
752                        .unwrap()
753                        .unwrap();
754                    assert_eq!(row_by_num.len(), 1);
755                    assert_eq!(&row_by_num[0].to_vec(), *v1);
756                }
757
758                // Read nothing
759                const BLOCKS_EMPTY_MASK: usize = 0b00;
760                for (row_num, _) in &data {
761                    // Simulates `by_number` queries
762                    assert!(cursor
763                        .row_by_number_with_cols(*row_num, BLOCKS_EMPTY_MASK)
764                        .unwrap()
765                        .unwrap()
766                        .is_empty());
767                }
768            }
769        }
770    }
771
772    #[test]
773    fn test_writer() {
774        let (col1, col2) = test_data(None);
775        let num_columns = 2;
776        let file_path = tempfile::NamedTempFile::new().unwrap();
777
778        append_two_rows(num_columns, file_path.path(), &col1, &col2);
779
780        // Appends a third row and prunes two rows, to make sure we prune from memory and disk
781        // offset list
782        prune_rows(num_columns, file_path.path(), &col1, &col2);
783
784        // Should be able to append new rows
785        append_two_rows(num_columns, file_path.path(), &col1, &col2);
786
787        // Simulate an unexpected shutdown before there's a chance to commit, and see that it
788        // unwinds successfully
789        test_append_consistency_no_commit(file_path.path(), &col1, &col2);
790
791        // Simulate an unexpected shutdown during commit, and see that it unwinds successfully
792        test_append_consistency_partial_commit(file_path.path(), &col1, &col2);
793    }
794
795    #[test]
796    fn test_pruner() {
797        let (col1, col2) = test_data(None);
798        let num_columns = 2;
799        let num_rows = 2;
800
801        // (missing_offsets, expected number of rows)
802        // If a row wasn't fully pruned, then it should clear it up as well
803        let missing_offsets_scenarios = [(1, 1), (2, 1), (3, 0)];
804
805        for (missing_offsets, expected_rows) in missing_offsets_scenarios {
806            let file_path = tempfile::NamedTempFile::new().unwrap();
807
808            append_two_rows(num_columns, file_path.path(), &col1, &col2);
809
810            simulate_interrupted_prune(num_columns, file_path.path(), num_rows, missing_offsets);
811
812            let nippy = NippyJar::load_without_header(file_path.path()).unwrap();
813            assert_eq!(nippy.rows, expected_rows);
814        }
815    }
816
817    fn test_append_consistency_partial_commit(
818        file_path: &Path,
819        col1: &[Vec<u8>],
820        col2: &[Vec<u8>],
821    ) {
822        let nippy = NippyJar::load_without_header(file_path).unwrap();
823
824        // Set the baseline that should be unwinded to
825        let initial_rows = nippy.rows;
826        let initial_data_size =
827            File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
828        let initial_offset_size =
829            File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize;
830        assert!(initial_data_size > 0);
831        assert!(initial_offset_size > 0);
832
833        // Appends a third row
834        let mut writer = NippyJarWriter::new(nippy).unwrap();
835        writer.append_column(Some(Ok(&col1[2]))).unwrap();
836        writer.append_column(Some(Ok(&col2[2]))).unwrap();
837
838        // Makes sure it doesn't write the last one offset (which is the expected file data size)
839        let _ = writer.offsets_mut().pop();
840
841        // `commit_offsets` is not a pub function. we call it here to simulate the shutdown before
842        // it can flush nippy.rows (config) to disk.
843        writer.commit_offsets().unwrap();
844
845        // Simulate an unexpected shutdown of the writer, before it can finish commit()
846        drop(writer);
847
848        let nippy = NippyJar::load_without_header(file_path).unwrap();
849        assert_eq!(initial_rows, nippy.rows);
850
851        // Data was written successfully
852        let new_data_size =
853            File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
854        assert_eq!(new_data_size, initial_data_size + col1[2].len() + col2[2].len());
855
856        // It should be + 16 (two columns were added), but there's a missing one (the one we pop)
857        assert_eq!(
858            initial_offset_size + 8,
859            File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize
860        );
861
862        // Writer will execute a consistency check and verify first that the offset list on disk
863        // doesn't match the nippy.rows, and prune it. Then, it will prune the data file
864        // accordingly as well.
865        let writer = NippyJarWriter::new(nippy).unwrap();
866        assert_eq!(initial_rows, writer.rows());
867        assert_eq!(
868            initial_offset_size,
869            File::open(writer.offsets_path()).unwrap().metadata().unwrap().len() as usize
870        );
871        assert_eq!(
872            initial_data_size,
873            File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize
874        );
875    }
876
877    fn test_append_consistency_no_commit(file_path: &Path, col1: &[Vec<u8>], col2: &[Vec<u8>]) {
878        let nippy = NippyJar::load_without_header(file_path).unwrap();
879
880        // Set the baseline that should be unwinded to
881        let initial_rows = nippy.rows;
882        let initial_data_size =
883            File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
884        let initial_offset_size =
885            File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize;
886        assert!(initial_data_size > 0);
887        assert!(initial_offset_size > 0);
888
889        // Appends a third row, so we have an offset list in memory, which is not flushed to disk,
890        // while the data has been.
891        let mut writer = NippyJarWriter::new(nippy).unwrap();
892        writer.append_column(Some(Ok(&col1[2]))).unwrap();
893        writer.append_column(Some(Ok(&col2[2]))).unwrap();
894
895        // Simulate an unexpected shutdown of the writer, before it can call commit()
896        drop(writer);
897
898        let nippy = NippyJar::load_without_header(file_path).unwrap();
899        assert_eq!(initial_rows, nippy.rows);
900
901        // Data was written successfully
902        let new_data_size =
903            File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
904        assert_eq!(new_data_size, initial_data_size + col1[2].len() + col2[2].len());
905
906        // Since offsets only get written on commit(), this remains the same
907        assert_eq!(
908            initial_offset_size,
909            File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize
910        );
911
912        // Writer will execute a consistency check and verify that the data file has more data than
913        // it should, and resets it to the last offset of the list (on disk here)
914        let writer = NippyJarWriter::new(nippy).unwrap();
915        assert_eq!(initial_rows, writer.rows());
916        assert_eq!(
917            initial_data_size,
918            File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize
919        );
920    }
921
922    fn append_two_rows(num_columns: usize, file_path: &Path, col1: &[Vec<u8>], col2: &[Vec<u8>]) {
923        // Create and add 1 row
924        {
925            let nippy = NippyJar::new_without_header(num_columns, file_path);
926            nippy.freeze_config().unwrap();
927            assert_eq!(nippy.max_row_size, 0);
928            assert_eq!(nippy.rows, 0);
929
930            let mut writer = NippyJarWriter::new(nippy).unwrap();
931            assert_eq!(writer.column(), 0);
932
933            writer.append_column(Some(Ok(&col1[0]))).unwrap();
934            assert_eq!(writer.column(), 1);
935            assert!(writer.is_dirty());
936
937            writer.append_column(Some(Ok(&col2[0]))).unwrap();
938            assert!(writer.is_dirty());
939
940            // Adding last column of a row resets writer and updates jar config
941            assert_eq!(writer.column(), 0);
942
943            // One offset per column + 1 offset at the end representing the expected file data size
944            assert_eq!(writer.offsets().len(), 3);
945            let expected_data_file_size = *writer.offsets().last().unwrap();
946            writer.commit().unwrap();
947            assert!(!writer.is_dirty());
948
949            assert_eq!(writer.max_row_size(), col1[0].len() + col2[0].len());
950            assert_eq!(writer.rows(), 1);
951            assert_eq!(
952                File::open(writer.offsets_path()).unwrap().metadata().unwrap().len(),
953                1 + num_columns as u64 * 8 + 8
954            );
955            assert_eq!(
956                File::open(writer.data_path()).unwrap().metadata().unwrap().len(),
957                expected_data_file_size
958            );
959        }
960
961        // Load and add 1 row
962        {
963            let nippy = NippyJar::load_without_header(file_path).unwrap();
964            // Check if it was committed successfully
965            assert_eq!(nippy.max_row_size, col1[0].len() + col2[0].len());
966            assert_eq!(nippy.rows, 1);
967
968            let mut writer = NippyJarWriter::new(nippy).unwrap();
969            assert_eq!(writer.column(), 0);
970
971            writer.append_column(Some(Ok(&col1[1]))).unwrap();
972            assert_eq!(writer.column(), 1);
973
974            writer.append_column(Some(Ok(&col2[1]))).unwrap();
975
976            // Adding last column of a row resets writer and updates jar config
977            assert_eq!(writer.column(), 0);
978
979            // One offset per column + 1 offset at the end representing the expected file data size
980            assert_eq!(writer.offsets().len(), 3);
981            let expected_data_file_size = *writer.offsets().last().unwrap();
982            writer.commit().unwrap();
983
984            assert_eq!(writer.max_row_size(), col1[0].len() + col2[0].len());
985            assert_eq!(writer.rows(), 2);
986            assert_eq!(
987                File::open(writer.offsets_path()).unwrap().metadata().unwrap().len(),
988                1 + writer.rows() as u64 * num_columns as u64 * 8 + 8
989            );
990            assert_eq!(
991                File::open(writer.data_path()).unwrap().metadata().unwrap().len(),
992                expected_data_file_size
993            );
994        }
995    }
996
997    fn prune_rows(num_columns: usize, file_path: &Path, col1: &[Vec<u8>], col2: &[Vec<u8>]) {
998        let nippy = NippyJar::load_without_header(file_path).unwrap();
999        let mut writer = NippyJarWriter::new(nippy).unwrap();
1000
1001        // Appends a third row, so we have an offset list in memory, which is not flushed to disk
1002        writer.append_column(Some(Ok(&col1[2]))).unwrap();
1003        writer.append_column(Some(Ok(&col2[2]))).unwrap();
1004        assert!(writer.is_dirty());
1005
1006        // This should prune from the on-memory offset list and ondisk offset list
1007        writer.prune_rows(2).unwrap();
1008        assert_eq!(writer.rows(), 1);
1009
1010        assert_eq!(
1011            File::open(writer.offsets_path()).unwrap().metadata().unwrap().len(),
1012            1 + writer.rows() as u64 * num_columns as u64 * 8 + 8
1013        );
1014
1015        let expected_data_size = col1[0].len() + col2[0].len();
1016        assert_eq!(
1017            File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize,
1018            expected_data_size
1019        );
1020
1021        let nippy = NippyJar::load_without_header(file_path).unwrap();
1022        {
1023            let data_reader = nippy.open_data_reader().unwrap();
1024            // there are only two valid offsets. so index 2 actually represents the expected file
1025            // data size.
1026            assert_eq!(data_reader.offset(2).unwrap(), expected_data_size as u64);
1027        }
1028
1029        // This should prune from the ondisk offset list and clear the jar.
1030        let mut writer = NippyJarWriter::new(nippy).unwrap();
1031        writer.prune_rows(1).unwrap();
1032        assert!(writer.is_dirty());
1033
1034        assert_eq!(writer.rows(), 0);
1035        assert_eq!(writer.max_row_size(), 0);
1036        assert_eq!(File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize, 0);
1037        // Only the byte that indicates how many bytes per offset should be left
1038        assert_eq!(
1039            File::open(writer.offsets_path()).unwrap().metadata().unwrap().len() as usize,
1040            1
1041        );
1042        writer.commit().unwrap();
1043        assert!(!writer.is_dirty());
1044    }
1045
1046    fn simulate_interrupted_prune(
1047        num_columns: usize,
1048        file_path: &Path,
1049        num_rows: u64,
1050        missing_offsets: u64,
1051    ) {
1052        let nippy = NippyJar::load_without_header(file_path).unwrap();
1053        let reader = nippy.open_data_reader().unwrap();
1054        let offsets_file =
1055            OpenOptions::new().read(true).write(true).open(nippy.offsets_path()).unwrap();
1056        let offsets_len = 1 + num_rows * num_columns as u64 * 8 + 8;
1057        assert_eq!(offsets_len, offsets_file.metadata().unwrap().len());
1058
1059        let data_file = OpenOptions::new().read(true).write(true).open(nippy.data_path()).unwrap();
1060        let data_len = reader.reverse_offset(0).unwrap();
1061        assert_eq!(data_len, data_file.metadata().unwrap().len());
1062
1063        // each data column is 32 bytes long
1064        // by deleting from the data file, the `consistency_check` will go through both branches:
1065        //      when the offset list wasn't updated after clearing the data (data_len > last
1066        // offset).      fixing above, will lead to offset count not match the rows (*
1067        // columns) of the configuration file
1068        data_file.set_len(data_len - 32 * missing_offsets).unwrap();
1069
1070        // runs the consistency check.
1071        let _ = NippyJarWriter::new(nippy).unwrap();
1072    }
1073}