Skip to main content

reth_nippy_jar/
lib.rs

1//! Immutable data store format.
2//!
3//! *Warning*: The `NippyJar` encoding format and its implementations are
4//! designed for storing and retrieving data internally. They are not hardened
5//! to safely read potentially malicious data.
6
7#![doc(
8    html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
9    html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
10    issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
11)]
12#![cfg_attr(not(test), warn(unused_crate_dependencies))]
13#![cfg_attr(docsrs, feature(doc_cfg))]
14
15use memmap2::Mmap;
16use serde::{Deserialize, Serialize};
17use std::{
18    error::Error as StdError,
19    fs::File,
20    io::{self, Read, Write},
21    ops::Range,
22    path::{Path, PathBuf},
23};
24use tracing::*;
25
26/// Compression algorithms supported by `NippyJar`.
27pub mod compression;
28#[cfg(test)]
29use compression::Compression;
30use compression::Compressors;
31
32/// empty enum for backwards compatibility
33#[derive(Debug, Serialize, Deserialize)]
34#[cfg_attr(test, derive(PartialEq, Eq))]
35pub enum Functions {}
36
37/// empty enum for backwards compatibility
38#[derive(Debug, Serialize, Deserialize)]
39#[cfg_attr(test, derive(PartialEq, Eq))]
40pub enum InclusionFilters {}
41
42mod error;
43pub use error::NippyJarError;
44
45mod cursor;
46pub use cursor::NippyJarCursor;
47
48mod writer;
49pub use writer::NippyJarWriter;
50
51mod consistency;
52pub use consistency::NippyJarChecker;
53
54/// The version number of the Nippy Jar format.
55const NIPPY_JAR_VERSION: usize = 1;
56/// The file extension used for index files.
57const INDEX_FILE_EXTENSION: &str = "idx";
58/// The file extension used for offsets files.
59const OFFSETS_FILE_EXTENSION: &str = "off";
60/// The file extension used for configuration files.
61pub const CONFIG_FILE_EXTENSION: &str = "conf";
62/// The file extension used for changeset offset sidecar files.
63pub const CHANGESET_OFFSETS_FILE_EXTENSION: &str = "csoff";
64
65/// A [`RefRow`] is a list of column value slices pointing to either an internal buffer or a
66/// memory-mapped file.
67type RefRow<'a> = Vec<&'a [u8]>;
68
69/// Alias type for a column value wrapped in `Result`.
70pub type ColumnResult<T> = Result<T, Box<dyn StdError + Send + Sync>>;
71
72/// A trait for the user-defined header of [`NippyJar`].
73pub trait NippyJarHeader:
74    Send + Sync + Serialize + for<'b> Deserialize<'b> + std::fmt::Debug + 'static
75{
76}
77
78// Blanket implementation for all types that implement the required traits.
79impl<T> NippyJarHeader for T where
80    T: Send + Sync + Serialize + for<'b> Deserialize<'b> + std::fmt::Debug + 'static
81{
82}
83
84/// `NippyJar` is a specialized storage format designed for immutable data.
85///
86/// Data is organized into a columnar format, enabling column-based compression. Data retrieval
87/// entails consulting an offset list and fetching the data from file via `mmap`.
88#[derive(Serialize, Deserialize)]
89#[cfg_attr(test, derive(PartialEq))]
90pub struct NippyJar<H = ()> {
91    /// The version of the `NippyJar` format.
92    version: usize,
93    /// User-defined header data.
94    /// Default: zero-sized unit type: no header data
95    user_header: H,
96    /// Number of data columns in the jar.
97    columns: usize,
98    /// Number of data rows in the jar.
99    rows: usize,
100    /// Optional compression algorithm applied to the data.
101    compressor: Option<Compressors>,
102    #[serde(skip)]
103    /// Optional field for backwards compatibility
104    filter: Option<InclusionFilters>,
105    #[serde(skip)]
106    /// Optional field for backwards compatibility
107    phf: Option<Functions>,
108    /// Maximum uncompressed row size of the set. This will enable decompression without any
109    /// resizing of the output buffer.
110    max_row_size: usize,
111    /// Data path for file. Supporting files will have a format `{path}.{extension}`.
112    #[serde(skip)]
113    path: PathBuf,
114}
115
116impl<H: NippyJarHeader> std::fmt::Debug for NippyJar<H> {
117    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118        f.debug_struct("NippyJar")
119            .field("version", &self.version)
120            .field("user_header", &self.user_header)
121            .field("rows", &self.rows)
122            .field("columns", &self.columns)
123            .field("compressor", &self.compressor)
124            .field("filter", &self.filter)
125            .field("phf", &self.phf)
126            .field("path", &self.path)
127            .field("max_row_size", &self.max_row_size)
128            .finish_non_exhaustive()
129    }
130}
131
132impl NippyJar<()> {
133    /// Creates a new [`NippyJar`] without an user-defined header data.
134    pub fn new_without_header(columns: usize, path: &Path) -> Self {
135        Self::new(columns, path, ())
136    }
137
138    /// Loads the file configuration and returns [`Self`] on a jar without user-defined header data.
139    pub fn load_without_header(path: &Path) -> Result<Self, NippyJarError> {
140        Self::load(path)
141    }
142}
143
144impl<H: NippyJarHeader> NippyJar<H> {
145    /// Creates a new [`NippyJar`] with a user-defined header data.
146    pub fn new(columns: usize, path: &Path, user_header: H) -> Self {
147        Self {
148            version: NIPPY_JAR_VERSION,
149            user_header,
150            columns,
151            rows: 0,
152            max_row_size: 0,
153            compressor: None,
154            filter: None,
155            phf: None,
156            path: path.to_path_buf(),
157        }
158    }
159
160    /// Adds [`compression::Zstd`] compression.
161    pub fn with_zstd(mut self, use_dict: bool, max_dict_size: usize) -> Self {
162        self.compressor =
163            Some(Compressors::Zstd(compression::Zstd::new(use_dict, max_dict_size, self.columns)));
164        self
165    }
166
167    /// Adds [`compression::Lz4`] compression.
168    pub fn with_lz4(mut self) -> Self {
169        self.compressor = Some(Compressors::Lz4(compression::Lz4::default()));
170        self
171    }
172
173    /// Gets a reference to the user header.
174    pub const fn user_header(&self) -> &H {
175        &self.user_header
176    }
177
178    /// Gets total columns in jar.
179    pub const fn columns(&self) -> usize {
180        self.columns
181    }
182
183    /// Gets total rows in jar.
184    pub const fn rows(&self) -> usize {
185        self.rows
186    }
187
188    /// Gets a reference to the compressor.
189    pub const fn compressor(&self) -> Option<&Compressors> {
190        self.compressor.as_ref()
191    }
192
193    /// Gets a mutable reference to the compressor.
194    pub const fn compressor_mut(&mut self) -> Option<&mut Compressors> {
195        self.compressor.as_mut()
196    }
197
198    /// Loads the file configuration and returns [`Self`].
199    ///
200    /// **The user must ensure the header type matches the one used during the jar's creation.**
201    pub fn load(path: &Path) -> Result<Self, NippyJarError> {
202        // Read [`Self`] located at the data file.
203        let config_path = path.with_extension(CONFIG_FILE_EXTENSION);
204        let config_file = File::open(&config_path)
205            .inspect_err(|e| {
206                warn!(?path, %e, "Failed to load static file jar");
207            })
208            .map_err(|err| reth_fs_util::FsPathError::open(err, config_path))?;
209
210        let mut obj = Self::load_from_reader(io::BufReader::new(config_file))?;
211        obj.path = path.to_path_buf();
212        Ok(obj)
213    }
214
215    /// Deserializes an instance of [`Self`] from a [`Read`] type.
216    pub fn load_from_reader<R: Read>(reader: R) -> Result<Self, NippyJarError> {
217        Ok(bincode::deserialize_from(reader)?)
218    }
219
220    /// Serializes an instance of [`Self`] to a [`Write`] type.
221    pub fn save_to_writer<W: Write>(&self, writer: W) -> Result<(), NippyJarError> {
222        Ok(bincode::serialize_into(writer, self)?)
223    }
224
225    /// Returns the path for the data file
226    pub fn data_path(&self) -> &Path {
227        self.path.as_ref()
228    }
229
230    /// Returns the path for the index file
231    pub fn index_path(&self) -> PathBuf {
232        self.path.with_extension(INDEX_FILE_EXTENSION)
233    }
234
235    /// Returns the path for the offsets file
236    pub fn offsets_path(&self) -> PathBuf {
237        self.path.with_extension(OFFSETS_FILE_EXTENSION)
238    }
239
240    /// Returns the path for the config file
241    pub fn config_path(&self) -> PathBuf {
242        self.path.with_extension(CONFIG_FILE_EXTENSION)
243    }
244
245    /// Returns the path for the changeset offsets sidecar file.
246    pub fn changeset_offsets_path(&self) -> PathBuf {
247        self.path.with_extension(CHANGESET_OFFSETS_FILE_EXTENSION)
248    }
249
250    /// Deletes from disk this [`NippyJar`] alongside every satellite file.
251    pub fn delete(self) -> Result<(), NippyJarError> {
252        // TODO(joshie): ensure consistency on unexpected shutdown
253
254        for path in [
255            self.data_path().into(),
256            self.index_path(),
257            self.offsets_path(),
258            self.config_path(),
259            self.changeset_offsets_path(),
260        ] {
261            if path.exists() {
262                debug!(target: "nippy-jar", ?path, "Removing file.");
263                reth_fs_util::remove_file(path)?;
264            }
265        }
266
267        Ok(())
268    }
269
270    /// Returns a [`DataReader`] of the data and offset file
271    pub fn open_data_reader(&self) -> Result<DataReader, NippyJarError> {
272        DataReader::new(self.data_path())
273    }
274
275    /// Writes all necessary configuration to file.
276    fn freeze_config(&self) -> Result<(), NippyJarError> {
277        Ok(reth_fs_util::atomic_write_file(&self.config_path(), |file| self.save_to_writer(file))?)
278    }
279}
280
281#[cfg(test)]
282impl<H: NippyJarHeader> NippyJar<H> {
283    /// If required, prepares any compression algorithm to an early pass of the data.
284    pub fn prepare_compression(
285        &mut self,
286        columns: Vec<impl IntoIterator<Item = Vec<u8>>>,
287    ) -> Result<(), NippyJarError> {
288        // Makes any necessary preparations for the compressors
289        if let Some(compression) = &mut self.compressor {
290            debug!(target: "nippy-jar", columns=columns.len(), "Preparing compression.");
291            compression.prepare_compression(columns)?;
292        }
293        Ok(())
294    }
295
296    /// Writes all data and configuration to a file and the offset index to another.
297    pub fn freeze(
298        self,
299        columns: Vec<impl IntoIterator<Item = ColumnResult<Vec<u8>>>>,
300        total_rows: u64,
301    ) -> Result<Self, NippyJarError> {
302        self.check_before_freeze(&columns)?;
303
304        debug!(target: "nippy-jar", path=?self.data_path(), "Opening data file.");
305
306        // Creates the writer, data and offsets file
307        let mut writer = NippyJarWriter::new(self)?;
308
309        // Append rows to file while holding offsets in memory
310        writer.append_rows(columns, total_rows)?;
311
312        // Flushes configuration and offsets to disk
313        writer.commit()?;
314
315        debug!(target: "nippy-jar", ?writer, "Finished writing data.");
316
317        Ok(writer.into_jar())
318    }
319
320    /// Safety checks before creating and returning a [`File`] handle to write data to.
321    fn check_before_freeze(
322        &self,
323        columns: &[impl IntoIterator<Item = ColumnResult<Vec<u8>>>],
324    ) -> Result<(), NippyJarError> {
325        if columns.len() != self.columns {
326            return Err(NippyJarError::ColumnLenMismatch(self.columns, columns.len()))
327        }
328
329        if let Some(compression) = &self.compressor &&
330            !compression.is_ready()
331        {
332            return Err(NippyJarError::CompressorNotReady)
333        }
334
335        Ok(())
336    }
337}
338
339/// Manages the reading of static file data using memory-mapped files.
340///
341/// Holds file and mmap descriptors of the data and offsets files of a `static_file`.
342#[derive(Debug)]
343pub struct DataReader {
344    /// Data file descriptor. Needs to be kept alive as long as `data_mmap` handle.
345    #[expect(dead_code)]
346    data_file: File,
347    /// Mmap handle for data.
348    data_mmap: Mmap,
349    /// Offset file descriptor. Needs to be kept alive as long as `offset_mmap` handle.
350    offset_file: File,
351    /// Mmap handle for offsets.
352    offset_mmap: Mmap,
353    /// Number of bytes that represent one offset.
354    offset_size: u8,
355}
356
357impl DataReader {
358    /// Reads the respective data and offsets file and returns [`DataReader`].
359    pub fn new(path: impl AsRef<Path>) -> Result<Self, NippyJarError> {
360        let data_file = File::open(path.as_ref())?;
361        // SAFETY: File is read-only and its descriptor is kept alive as long as the mmap handle.
362        let data_mmap = unsafe { Mmap::map(&data_file)? };
363
364        let offset_file = File::open(path.as_ref().with_extension(OFFSETS_FILE_EXTENSION))?;
365        // SAFETY: File is read-only and its descriptor is kept alive as long as the mmap handle.
366        let offset_mmap = unsafe { Mmap::map(&offset_file)? };
367
368        // First byte is the size of one offset in bytes
369        let offset_size = offset_mmap[0];
370
371        // Ensure that the size of an offset is at most 8 bytes.
372        if offset_size > 8 {
373            return Err(NippyJarError::OffsetSizeTooBig { offset_size })
374        } else if offset_size == 0 {
375            return Err(NippyJarError::OffsetSizeTooSmall { offset_size })
376        }
377
378        Ok(Self { data_file, data_mmap, offset_file, offset_size, offset_mmap })
379    }
380
381    /// Returns the offset for the requested data index
382    pub fn offset(&self, index: usize) -> Result<u64, NippyJarError> {
383        // + 1 represents the offset_len u8 which is in the beginning of the file
384        let from = index * self.offset_size as usize + 1;
385
386        self.offset_at(from)
387    }
388
389    /// Returns the offset for the requested data index starting from the end
390    pub fn reverse_offset(&self, index: usize) -> Result<u64, NippyJarError> {
391        let offsets_file_size = self.offset_file.metadata()?.len() as usize;
392
393        if offsets_file_size > 1 {
394            let from = offsets_file_size - self.offset_size as usize * (index + 1);
395
396            self.offset_at(from)
397        } else {
398            Ok(0)
399        }
400    }
401
402    /// Returns total number of offsets in the file.
403    /// The size of one offset is determined by the file itself.
404    pub fn offsets_count(&self) -> Result<usize, NippyJarError> {
405        Ok((self.offset_file.metadata()?.len().saturating_sub(1) / self.offset_size as u64)
406            as usize)
407    }
408
409    /// Reads one offset-sized (determined by the offset file) u64 at the provided index.
410    fn offset_at(&self, index: usize) -> Result<u64, NippyJarError> {
411        let mut buffer: [u8; 8] = [0; 8];
412
413        let offset_end = index.saturating_add(self.offset_size as usize);
414        if offset_end > self.offset_mmap.len() {
415            return Err(NippyJarError::OffsetOutOfBounds { index })
416        }
417
418        buffer[..self.offset_size as usize].copy_from_slice(&self.offset_mmap[index..offset_end]);
419        Ok(u64::from_le_bytes(buffer))
420    }
421
422    /// Returns number of bytes that represent one offset.
423    pub const fn offset_size(&self) -> u8 {
424        self.offset_size
425    }
426
427    /// Returns the underlying data as a slice of bytes for the provided range.
428    pub fn data(&self, range: Range<usize>) -> &[u8] {
429        &self.data_mmap[range]
430    }
431
432    /// Returns total size of data file.
433    pub fn size(&self) -> usize {
434        self.data_mmap.len()
435    }
436
437    /// Returns total size of offsets file.
438    pub fn offsets_size(&self) -> usize {
439        self.offset_mmap.len()
440    }
441}
442
443#[cfg(test)]
444mod tests {
445    use super::*;
446    use compression::Compression;
447    use rand::{rngs::SmallRng, seq::SliceRandom, RngCore, SeedableRng};
448    use std::{fs::OpenOptions, io::Read};
449
450    type ColumnResults<T> = Vec<ColumnResult<T>>;
451    type ColumnValues = Vec<Vec<u8>>;
452
453    fn test_data(seed: Option<u64>) -> (ColumnValues, ColumnValues) {
454        let value_length = 32;
455        let num_rows = 100;
456
457        let mut vec: Vec<u8> = vec![0; value_length];
458        let mut rng = seed.map(SmallRng::seed_from_u64).unwrap_or_else(SmallRng::from_os_rng);
459
460        let mut entry_gen = || {
461            (0..num_rows)
462                .map(|_| {
463                    rng.fill_bytes(&mut vec[..]);
464                    vec.clone()
465                })
466                .collect()
467        };
468
469        (entry_gen(), entry_gen())
470    }
471
472    fn clone_with_result(col: &ColumnValues) -> ColumnResults<Vec<u8>> {
473        col.iter().map(|v| Ok(v.clone())).collect()
474    }
475
476    #[test]
477    fn test_config_serialization() {
478        let file = tempfile::NamedTempFile::new().unwrap();
479        let jar = NippyJar::new_without_header(23, file.path()).with_lz4();
480        jar.freeze_config().unwrap();
481
482        let mut config_file = OpenOptions::new().read(true).open(jar.config_path()).unwrap();
483        let config_file_len = config_file.metadata().unwrap().len();
484        assert_eq!(config_file_len, 37);
485
486        let mut buf = Vec::with_capacity(config_file_len as usize);
487        config_file.read_to_end(&mut buf).unwrap();
488
489        assert_eq!(
490            vec![
491                1, 0, 0, 0, 0, 0, 0, 0, 23, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0,
492                0, 0, 0, 0, 0, 0, 0, 0, 0, 0
493            ],
494            buf
495        );
496
497        let mut read_jar = bincode::deserialize_from::<_, NippyJar>(&buf[..]).unwrap();
498        // Path is not ser/de
499        read_jar.path = file.path().to_path_buf();
500        assert_eq!(jar, read_jar);
501    }
502
503    #[test]
504    fn test_zstd_with_dictionaries() {
505        let (col1, col2) = test_data(None);
506        let num_rows = col1.len() as u64;
507        let num_columns = 2;
508        let file_path = tempfile::NamedTempFile::new().unwrap();
509
510        let nippy = NippyJar::new_without_header(num_columns, file_path.path());
511        assert!(nippy.compressor().is_none());
512
513        let mut nippy =
514            NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(true, 5000);
515        assert!(nippy.compressor().is_some());
516
517        if let Some(Compressors::Zstd(zstd)) = &mut nippy.compressor_mut() {
518            assert!(matches!(zstd.compressors(), Err(NippyJarError::CompressorNotReady)));
519
520            // Make sure the number of column iterators match the initial set up ones.
521            assert!(matches!(
522                zstd.prepare_compression(vec![col1.clone(), col2.clone(), col2.clone()]),
523                Err(NippyJarError::ColumnLenMismatch(columns, 3)) if columns == num_columns
524            ));
525        }
526
527        // If ZSTD is enabled, do not write to the file unless the column dictionaries have been
528        // calculated.
529        assert!(matches!(
530            nippy.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows),
531            Err(NippyJarError::CompressorNotReady)
532        ));
533
534        let mut nippy =
535            NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(true, 5000);
536        assert!(nippy.compressor().is_some());
537
538        nippy.prepare_compression(vec![col1.clone(), col2.clone()]).unwrap();
539
540        if let Some(Compressors::Zstd(zstd)) = &nippy.compressor() {
541            assert!(matches!(
542                (&zstd.state, zstd.dictionaries.as_ref().map(|dict| dict.len())),
543                (compression::ZstdState::Ready, Some(columns)) if columns == num_columns
544            ));
545        }
546
547        let nippy = nippy
548            .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
549            .unwrap();
550
551        let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
552        assert_eq!(nippy.version, loaded_nippy.version);
553        assert_eq!(nippy.columns, loaded_nippy.columns);
554        assert_eq!(nippy.filter, loaded_nippy.filter);
555        assert_eq!(nippy.phf, loaded_nippy.phf);
556        assert_eq!(nippy.max_row_size, loaded_nippy.max_row_size);
557        assert_eq!(nippy.path, loaded_nippy.path);
558
559        if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() {
560            assert!(zstd.use_dict);
561            let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
562
563            // Iterate over compressed values and compare
564            let mut row_index = 0usize;
565            while let Some(row) = cursor.next_row().unwrap() {
566                assert_eq!(
567                    (row[0], row[1]),
568                    (col1[row_index].as_slice(), col2[row_index].as_slice())
569                );
570                row_index += 1;
571            }
572        } else {
573            panic!("Expected Zstd compressor")
574        }
575    }
576
577    #[test]
578    fn test_lz4() {
579        let (col1, col2) = test_data(None);
580        let num_rows = col1.len() as u64;
581        let num_columns = 2;
582        let file_path = tempfile::NamedTempFile::new().unwrap();
583
584        let nippy = NippyJar::new_without_header(num_columns, file_path.path());
585        assert!(nippy.compressor().is_none());
586
587        let nippy = NippyJar::new_without_header(num_columns, file_path.path()).with_lz4();
588        assert!(nippy.compressor().is_some());
589
590        let nippy = nippy
591            .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
592            .unwrap();
593
594        let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
595        assert_eq!(nippy, loaded_nippy);
596
597        if let Some(Compressors::Lz4(_)) = loaded_nippy.compressor() {
598            let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
599
600            // Iterate over compressed values and compare
601            let mut row_index = 0usize;
602            while let Some(row) = cursor.next_row().unwrap() {
603                assert_eq!(
604                    (row[0], row[1]),
605                    (col1[row_index].as_slice(), col2[row_index].as_slice())
606                );
607                row_index += 1;
608            }
609        } else {
610            panic!("Expected Lz4 compressor")
611        }
612    }
613
614    #[test]
615    fn test_zstd_no_dictionaries() {
616        let (col1, col2) = test_data(None);
617        let num_rows = col1.len() as u64;
618        let num_columns = 2;
619        let file_path = tempfile::NamedTempFile::new().unwrap();
620
621        let nippy = NippyJar::new_without_header(num_columns, file_path.path());
622        assert!(nippy.compressor().is_none());
623
624        let nippy =
625            NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(false, 5000);
626        assert!(nippy.compressor().is_some());
627
628        let nippy = nippy
629            .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
630            .unwrap();
631
632        let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
633        assert_eq!(nippy, loaded_nippy);
634
635        if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() {
636            assert!(!zstd.use_dict);
637
638            let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
639
640            // Iterate over compressed values and compare
641            let mut row_index = 0usize;
642            while let Some(row) = cursor.next_row().unwrap() {
643                assert_eq!(
644                    (row[0], row[1]),
645                    (col1[row_index].as_slice(), col2[row_index].as_slice())
646                );
647                row_index += 1;
648            }
649        } else {
650            panic!("Expected Zstd compressor")
651        }
652    }
653
654    /// Tests `NippyJar` with everything enabled.
655    #[test]
656    fn test_full_nippy_jar() {
657        let (col1, col2) = test_data(None);
658        let num_rows = col1.len() as u64;
659        let num_columns = 2;
660        let file_path = tempfile::NamedTempFile::new().unwrap();
661        let data = vec![col1.clone(), col2.clone()];
662
663        let block_start = 500;
664
665        #[derive(Serialize, Deserialize, Debug)]
666        struct BlockJarHeader {
667            block_start: usize,
668        }
669
670        // Create file
671        {
672            let mut nippy =
673                NippyJar::new(num_columns, file_path.path(), BlockJarHeader { block_start })
674                    .with_zstd(true, 5000);
675
676            nippy.prepare_compression(data.clone()).unwrap();
677            nippy
678                .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
679                .unwrap();
680        }
681
682        // Read file
683        {
684            let loaded_nippy = NippyJar::<BlockJarHeader>::load(file_path.path()).unwrap();
685
686            assert!(loaded_nippy.compressor().is_some());
687            assert_eq!(loaded_nippy.user_header().block_start, block_start);
688
689            if let Some(Compressors::Zstd(_zstd)) = loaded_nippy.compressor() {
690                let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
691
692                // Iterate over compressed values and compare
693                let mut row_num = 0usize;
694                while let Some(row) = cursor.next_row().unwrap() {
695                    assert_eq!(
696                        (row[0], row[1]),
697                        (data[0][row_num].as_slice(), data[1][row_num].as_slice())
698                    );
699                    row_num += 1;
700                }
701
702                // Shuffled for chaos.
703                let mut data = col1.iter().zip(col2.iter()).enumerate().collect::<Vec<_>>();
704                data.shuffle(&mut rand::rng());
705
706                for (row_num, (v0, v1)) in data {
707                    // Simulates `by_number` queries
708                    let row_by_num = cursor.row_by_number(row_num).unwrap().unwrap();
709                    assert_eq!((&row_by_num[0].to_vec(), &row_by_num[1].to_vec()), (v0, v1));
710                }
711            }
712        }
713    }
714
715    #[test]
716    fn test_selectable_column_values() {
717        let (col1, col2) = test_data(None);
718        let num_rows = col1.len() as u64;
719        let num_columns = 2;
720        let file_path = tempfile::NamedTempFile::new().unwrap();
721        let data = vec![col1.clone(), col2.clone()];
722
723        // Create file
724        {
725            let mut nippy =
726                NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(true, 5000);
727            nippy.prepare_compression(data).unwrap();
728            nippy
729                .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
730                .unwrap();
731        }
732
733        // Read file
734        {
735            let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
736
737            if let Some(Compressors::Zstd(_zstd)) = loaded_nippy.compressor() {
738                let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
739
740                // Shuffled for chaos.
741                let mut data = col1.iter().zip(col2.iter()).enumerate().collect::<Vec<_>>();
742                data.shuffle(&mut rand::rng());
743
744                // Imagine `Blocks` static file has two columns: `Block | StoredWithdrawals`
745                const BLOCKS_FULL_MASK: usize = 0b11;
746
747                // Read both columns
748                for (row_num, (v0, v1)) in &data {
749                    // Simulates `by_number` queries
750                    let row_by_num = cursor
751                        .row_by_number_with_cols(*row_num, BLOCKS_FULL_MASK)
752                        .unwrap()
753                        .unwrap();
754                    assert_eq!((&row_by_num[0].to_vec(), &row_by_num[1].to_vec()), (*v0, *v1));
755                }
756
757                // Read first column only: `Block`
758                const BLOCKS_BLOCK_MASK: usize = 0b01;
759                for (row_num, (v0, _)) in &data {
760                    // Simulates `by_number` queries
761                    let row_by_num = cursor
762                        .row_by_number_with_cols(*row_num, BLOCKS_BLOCK_MASK)
763                        .unwrap()
764                        .unwrap();
765                    assert_eq!(row_by_num.len(), 1);
766                    assert_eq!(&row_by_num[0].to_vec(), *v0);
767                }
768
769                // Read second column only: `Block`
770                const BLOCKS_WITHDRAWAL_MASK: usize = 0b10;
771                for (row_num, (_, v1)) in &data {
772                    // Simulates `by_number` queries
773                    let row_by_num = cursor
774                        .row_by_number_with_cols(*row_num, BLOCKS_WITHDRAWAL_MASK)
775                        .unwrap()
776                        .unwrap();
777                    assert_eq!(row_by_num.len(), 1);
778                    assert_eq!(&row_by_num[0].to_vec(), *v1);
779                }
780
781                // Read nothing
782                const BLOCKS_EMPTY_MASK: usize = 0b00;
783                for (row_num, _) in &data {
784                    // Simulates `by_number` queries
785                    assert!(cursor
786                        .row_by_number_with_cols(*row_num, BLOCKS_EMPTY_MASK)
787                        .unwrap()
788                        .unwrap()
789                        .is_empty());
790                }
791            }
792        }
793    }
794
795    #[test]
796    fn test_writer() {
797        let (col1, col2) = test_data(None);
798        let num_columns = 2;
799        let file_path = tempfile::NamedTempFile::new().unwrap();
800
801        append_two_rows(num_columns, file_path.path(), &col1, &col2);
802
803        // Appends a third row and prunes two rows, to make sure we prune from memory and disk
804        // offset list
805        prune_rows(num_columns, file_path.path(), &col1, &col2);
806
807        // Should be able to append new rows
808        append_two_rows(num_columns, file_path.path(), &col1, &col2);
809
810        // Simulate an unexpected shutdown before there's a chance to commit, and see that it
811        // unwinds successfully
812        test_append_consistency_no_commit(file_path.path(), &col1, &col2);
813
814        // Simulate an unexpected shutdown during commit, and see that it unwinds successfully
815        test_append_consistency_partial_commit(file_path.path(), &col1, &col2);
816    }
817
818    #[test]
819    fn test_pruner() {
820        let (col1, col2) = test_data(None);
821        let num_columns = 2;
822        let num_rows = 2;
823
824        // (missing_offsets, expected number of rows)
825        // If a row wasn't fully pruned, then it should clear it up as well
826        let missing_offsets_scenarios = [(1, 1), (2, 1), (3, 0)];
827
828        for (missing_offsets, expected_rows) in missing_offsets_scenarios {
829            let file_path = tempfile::NamedTempFile::new().unwrap();
830
831            append_two_rows(num_columns, file_path.path(), &col1, &col2);
832
833            simulate_interrupted_prune(num_columns, file_path.path(), num_rows, missing_offsets);
834
835            let nippy = NippyJar::load_without_header(file_path.path()).unwrap();
836            assert_eq!(nippy.rows, expected_rows);
837        }
838    }
839
840    fn test_append_consistency_partial_commit(
841        file_path: &Path,
842        col1: &[Vec<u8>],
843        col2: &[Vec<u8>],
844    ) {
845        let nippy = NippyJar::load_without_header(file_path).unwrap();
846
847        // Set the baseline that should be unwinded to
848        let initial_rows = nippy.rows;
849        let initial_data_size =
850            File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
851        let initial_offset_size =
852            File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize;
853        assert!(initial_data_size > 0);
854        assert!(initial_offset_size > 0);
855
856        // Appends a third row
857        let mut writer = NippyJarWriter::new(nippy).unwrap();
858        writer.append_column(Some(Ok(&col1[2]))).unwrap();
859        writer.append_column(Some(Ok(&col2[2]))).unwrap();
860
861        // Makes sure it doesn't write the last one offset (which is the expected file data size)
862        let _ = writer.offsets_mut().pop();
863
864        // `commit_offsets` is not a pub function. we call it here to simulate the shutdown before
865        // it can flush nippy.rows (config) to disk.
866        writer.commit_offsets().unwrap();
867
868        // Simulate an unexpected shutdown of the writer, before it can finish commit()
869        drop(writer);
870
871        let nippy = NippyJar::load_without_header(file_path).unwrap();
872        assert_eq!(initial_rows, nippy.rows);
873
874        // Data was written successfully
875        let new_data_size =
876            File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
877        assert_eq!(new_data_size, initial_data_size + col1[2].len() + col2[2].len());
878
879        // It should be + 16 (two columns were added), but there's a missing one (the one we pop)
880        assert_eq!(
881            initial_offset_size + 8,
882            File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize
883        );
884
885        // Writer will execute a consistency check and verify first that the offset list on disk
886        // doesn't match the nippy.rows, and prune it. Then, it will prune the data file
887        // accordingly as well.
888        let writer = NippyJarWriter::new(nippy).unwrap();
889        assert_eq!(initial_rows, writer.rows());
890        assert_eq!(
891            initial_offset_size,
892            File::open(writer.offsets_path()).unwrap().metadata().unwrap().len() as usize
893        );
894        assert_eq!(
895            initial_data_size,
896            File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize
897        );
898    }
899
900    fn test_append_consistency_no_commit(file_path: &Path, col1: &[Vec<u8>], col2: &[Vec<u8>]) {
901        let nippy = NippyJar::load_without_header(file_path).unwrap();
902
903        // Set the baseline that should be unwinded to
904        let initial_rows = nippy.rows;
905        let initial_data_size =
906            File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
907        let initial_offset_size =
908            File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize;
909        assert!(initial_data_size > 0);
910        assert!(initial_offset_size > 0);
911
912        // Appends a third row, so we have an offset list in memory, which is not flushed to disk,
913        // while the data has been.
914        let mut writer = NippyJarWriter::new(nippy).unwrap();
915        writer.append_column(Some(Ok(&col1[2]))).unwrap();
916        writer.append_column(Some(Ok(&col2[2]))).unwrap();
917
918        // Simulate an unexpected shutdown of the writer, before it can call commit()
919        drop(writer);
920
921        let nippy = NippyJar::load_without_header(file_path).unwrap();
922        assert_eq!(initial_rows, nippy.rows);
923
924        // Data was written successfully
925        let new_data_size =
926            File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
927        assert_eq!(new_data_size, initial_data_size + col1[2].len() + col2[2].len());
928
929        // Since offsets only get written on commit(), this remains the same
930        assert_eq!(
931            initial_offset_size,
932            File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize
933        );
934
935        // Writer will execute a consistency check and verify that the data file has more data than
936        // it should, and resets it to the last offset of the list (on disk here)
937        let writer = NippyJarWriter::new(nippy).unwrap();
938        assert_eq!(initial_rows, writer.rows());
939        assert_eq!(
940            initial_data_size,
941            File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize
942        );
943    }
944
945    fn append_two_rows(num_columns: usize, file_path: &Path, col1: &[Vec<u8>], col2: &[Vec<u8>]) {
946        // Create and add 1 row
947        {
948            let nippy = NippyJar::new_without_header(num_columns, file_path);
949            nippy.freeze_config().unwrap();
950            assert_eq!(nippy.max_row_size, 0);
951            assert_eq!(nippy.rows, 0);
952
953            let mut writer = NippyJarWriter::new(nippy).unwrap();
954            assert_eq!(writer.column(), 0);
955
956            writer.append_column(Some(Ok(&col1[0]))).unwrap();
957            assert_eq!(writer.column(), 1);
958            assert!(writer.is_dirty());
959
960            writer.append_column(Some(Ok(&col2[0]))).unwrap();
961            assert!(writer.is_dirty());
962
963            // Adding last column of a row resets writer and updates jar config
964            assert_eq!(writer.column(), 0);
965
966            // One offset per column + 1 offset at the end representing the expected file data size
967            assert_eq!(writer.offsets().len(), 3);
968            let expected_data_file_size = *writer.offsets().last().unwrap();
969            writer.commit().unwrap();
970            assert!(!writer.is_dirty());
971
972            assert_eq!(writer.max_row_size(), col1[0].len() + col2[0].len());
973            assert_eq!(writer.rows(), 1);
974            assert_eq!(
975                File::open(writer.offsets_path()).unwrap().metadata().unwrap().len(),
976                1 + num_columns as u64 * 8 + 8
977            );
978            assert_eq!(
979                File::open(writer.data_path()).unwrap().metadata().unwrap().len(),
980                expected_data_file_size
981            );
982        }
983
984        // Load and add 1 row
985        {
986            let nippy = NippyJar::load_without_header(file_path).unwrap();
987            // Check if it was committed successfully
988            assert_eq!(nippy.max_row_size, col1[0].len() + col2[0].len());
989            assert_eq!(nippy.rows, 1);
990
991            let mut writer = NippyJarWriter::new(nippy).unwrap();
992            assert_eq!(writer.column(), 0);
993
994            writer.append_column(Some(Ok(&col1[1]))).unwrap();
995            assert_eq!(writer.column(), 1);
996
997            writer.append_column(Some(Ok(&col2[1]))).unwrap();
998
999            // Adding last column of a row resets writer and updates jar config
1000            assert_eq!(writer.column(), 0);
1001
1002            // One offset per column + 1 offset at the end representing the expected file data size
1003            assert_eq!(writer.offsets().len(), 3);
1004            let expected_data_file_size = *writer.offsets().last().unwrap();
1005            writer.commit().unwrap();
1006
1007            assert_eq!(writer.max_row_size(), col1[0].len() + col2[0].len());
1008            assert_eq!(writer.rows(), 2);
1009            assert_eq!(
1010                File::open(writer.offsets_path()).unwrap().metadata().unwrap().len(),
1011                1 + writer.rows() as u64 * num_columns as u64 * 8 + 8
1012            );
1013            assert_eq!(
1014                File::open(writer.data_path()).unwrap().metadata().unwrap().len(),
1015                expected_data_file_size
1016            );
1017        }
1018    }
1019
1020    fn prune_rows(num_columns: usize, file_path: &Path, col1: &[Vec<u8>], col2: &[Vec<u8>]) {
1021        let nippy = NippyJar::load_without_header(file_path).unwrap();
1022        let mut writer = NippyJarWriter::new(nippy).unwrap();
1023
1024        // Appends a third row, so we have an offset list in memory, which is not flushed to disk
1025        writer.append_column(Some(Ok(&col1[2]))).unwrap();
1026        writer.append_column(Some(Ok(&col2[2]))).unwrap();
1027        assert!(writer.is_dirty());
1028
1029        // This should prune from the on-memory offset list and ondisk offset list
1030        writer.prune_rows(2).unwrap();
1031        assert_eq!(writer.rows(), 1);
1032
1033        assert_eq!(
1034            File::open(writer.offsets_path()).unwrap().metadata().unwrap().len(),
1035            1 + writer.rows() as u64 * num_columns as u64 * 8 + 8
1036        );
1037
1038        let expected_data_size = col1[0].len() + col2[0].len();
1039        assert_eq!(
1040            File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize,
1041            expected_data_size
1042        );
1043
1044        let nippy = NippyJar::load_without_header(file_path).unwrap();
1045        {
1046            let data_reader = nippy.open_data_reader().unwrap();
1047            // there are only two valid offsets. so index 2 actually represents the expected file
1048            // data size.
1049            assert_eq!(data_reader.offset(2).unwrap(), expected_data_size as u64);
1050        }
1051
1052        // This should prune from the ondisk offset list and clear the jar.
1053        let mut writer = NippyJarWriter::new(nippy).unwrap();
1054        writer.prune_rows(1).unwrap();
1055        assert!(writer.is_dirty());
1056
1057        assert_eq!(writer.rows(), 0);
1058        assert_eq!(writer.max_row_size(), 0);
1059        assert_eq!(File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize, 0);
1060        // Offset size byte (1) + final offset (8) = 9 bytes
1061        assert_eq!(
1062            File::open(writer.offsets_path()).unwrap().metadata().unwrap().len() as usize,
1063            9
1064        );
1065        writer.commit().unwrap();
1066        assert!(!writer.is_dirty());
1067    }
1068
1069    fn simulate_interrupted_prune(
1070        num_columns: usize,
1071        file_path: &Path,
1072        num_rows: u64,
1073        missing_offsets: u64,
1074    ) {
1075        let nippy = NippyJar::load_without_header(file_path).unwrap();
1076        let reader = nippy.open_data_reader().unwrap();
1077        let offsets_file =
1078            OpenOptions::new().read(true).write(true).open(nippy.offsets_path()).unwrap();
1079        let offsets_len = 1 + num_rows * num_columns as u64 * 8 + 8;
1080        assert_eq!(offsets_len, offsets_file.metadata().unwrap().len());
1081
1082        let data_file = OpenOptions::new().read(true).write(true).open(nippy.data_path()).unwrap();
1083        let data_len = reader.reverse_offset(0).unwrap();
1084        assert_eq!(data_len, data_file.metadata().unwrap().len());
1085
1086        // each data column is 32 bytes long
1087        // by deleting from the data file, the `consistency_check` will go through both branches:
1088        //      when the offset list wasn't updated after clearing the data (data_len > last
1089        // offset).      fixing above, will lead to offset count not match the rows (*
1090        // columns) of the configuration file
1091        data_file.set_len(data_len - 32 * missing_offsets).unwrap();
1092
1093        // runs the consistency check.
1094        let _ = NippyJarWriter::new(nippy).unwrap();
1095    }
1096}