reth_nippy_jar/
lib.rs

1//! Immutable data store format.
2//!
3//! *Warning*: The `NippyJar` encoding format and its implementations are
4//! designed for storing and retrieving data internally. They are not hardened
5//! to safely read potentially malicious data.
6
7#![doc(
8    html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
9    html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
10    issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
11)]
12#![cfg_attr(not(test), warn(unused_crate_dependencies))]
13#![cfg_attr(docsrs, feature(doc_cfg))]
14
15use memmap2::Mmap;
16use serde::{Deserialize, Serialize};
17use std::{
18    error::Error as StdError,
19    fs::File,
20    io::{Read, Write},
21    ops::Range,
22    path::{Path, PathBuf},
23};
24use tracing::*;
25
26/// Compression algorithms supported by `NippyJar`.
27pub mod compression;
28#[cfg(test)]
29use compression::Compression;
30use compression::Compressors;
31
32/// empty enum for backwards compatibility
33#[derive(Debug, Serialize, Deserialize)]
34#[cfg_attr(test, derive(PartialEq, Eq))]
35pub enum Functions {}
36
37/// empty enum for backwards compatibility
38#[derive(Debug, Serialize, Deserialize)]
39#[cfg_attr(test, derive(PartialEq, Eq))]
40pub enum InclusionFilters {}
41
42mod error;
43pub use error::NippyJarError;
44
45mod cursor;
46pub use cursor::NippyJarCursor;
47
48mod writer;
49pub use writer::NippyJarWriter;
50
51mod consistency;
52pub use consistency::NippyJarChecker;
53
54/// The version number of the Nippy Jar format.
55const NIPPY_JAR_VERSION: usize = 1;
56/// The file extension used for index files.
57const INDEX_FILE_EXTENSION: &str = "idx";
58/// The file extension used for offsets files.
59const OFFSETS_FILE_EXTENSION: &str = "off";
60/// The file extension used for configuration files.
61pub const CONFIG_FILE_EXTENSION: &str = "conf";
62
63/// A [`RefRow`] is a list of column value slices pointing to either an internal buffer or a
64/// memory-mapped file.
65type RefRow<'a> = Vec<&'a [u8]>;
66
67/// Alias type for a column value wrapped in `Result`.
68pub type ColumnResult<T> = Result<T, Box<dyn StdError + Send + Sync>>;
69
70/// A trait for the user-defined header of [`NippyJar`].
71pub trait NippyJarHeader:
72    Send + Sync + Serialize + for<'b> Deserialize<'b> + std::fmt::Debug + 'static
73{
74}
75
76// Blanket implementation for all types that implement the required traits.
77impl<T> NippyJarHeader for T where
78    T: Send + Sync + Serialize + for<'b> Deserialize<'b> + std::fmt::Debug + 'static
79{
80}
81
82/// `NippyJar` is a specialized storage format designed for immutable data.
83///
84/// Data is organized into a columnar format, enabling column-based compression. Data retrieval
85/// entails consulting an offset list and fetching the data from file via `mmap`.
86#[derive(Serialize, Deserialize)]
87#[cfg_attr(test, derive(PartialEq))]
88pub struct NippyJar<H = ()> {
89    /// The version of the `NippyJar` format.
90    version: usize,
91    /// User-defined header data.
92    /// Default: zero-sized unit type: no header data
93    user_header: H,
94    /// Number of data columns in the jar.
95    columns: usize,
96    /// Number of data rows in the jar.
97    rows: usize,
98    /// Optional compression algorithm applied to the data.
99    compressor: Option<Compressors>,
100    #[serde(skip)]
101    /// Optional field for backwards compatibility
102    filter: Option<InclusionFilters>,
103    #[serde(skip)]
104    /// Optional field for backwards compatibility
105    phf: Option<Functions>,
106    /// Maximum uncompressed row size of the set. This will enable decompression without any
107    /// resizing of the output buffer.
108    max_row_size: usize,
109    /// Data path for file. Supporting files will have a format `{path}.{extension}`.
110    #[serde(skip)]
111    path: PathBuf,
112}
113
114impl<H: NippyJarHeader> std::fmt::Debug for NippyJar<H> {
115    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116        f.debug_struct("NippyJar")
117            .field("version", &self.version)
118            .field("user_header", &self.user_header)
119            .field("rows", &self.rows)
120            .field("columns", &self.columns)
121            .field("compressor", &self.compressor)
122            .field("filter", &self.filter)
123            .field("phf", &self.phf)
124            .field("path", &self.path)
125            .field("max_row_size", &self.max_row_size)
126            .finish_non_exhaustive()
127    }
128}
129
130impl NippyJar<()> {
131    /// Creates a new [`NippyJar`] without an user-defined header data.
132    pub fn new_without_header(columns: usize, path: &Path) -> Self {
133        Self::new(columns, path, ())
134    }
135
136    /// Loads the file configuration and returns [`Self`] on a jar without user-defined header data.
137    pub fn load_without_header(path: &Path) -> Result<Self, NippyJarError> {
138        Self::load(path)
139    }
140}
141
142impl<H: NippyJarHeader> NippyJar<H> {
143    /// Creates a new [`NippyJar`] with a user-defined header data.
144    pub fn new(columns: usize, path: &Path, user_header: H) -> Self {
145        Self {
146            version: NIPPY_JAR_VERSION,
147            user_header,
148            columns,
149            rows: 0,
150            max_row_size: 0,
151            compressor: None,
152            filter: None,
153            phf: None,
154            path: path.to_path_buf(),
155        }
156    }
157
158    /// Adds [`compression::Zstd`] compression.
159    pub fn with_zstd(mut self, use_dict: bool, max_dict_size: usize) -> Self {
160        self.compressor =
161            Some(Compressors::Zstd(compression::Zstd::new(use_dict, max_dict_size, self.columns)));
162        self
163    }
164
165    /// Adds [`compression::Lz4`] compression.
166    pub fn with_lz4(mut self) -> Self {
167        self.compressor = Some(Compressors::Lz4(compression::Lz4::default()));
168        self
169    }
170
171    /// Gets a reference to the user header.
172    pub const fn user_header(&self) -> &H {
173        &self.user_header
174    }
175
176    /// Gets total columns in jar.
177    pub const fn columns(&self) -> usize {
178        self.columns
179    }
180
181    /// Gets total rows in jar.
182    pub const fn rows(&self) -> usize {
183        self.rows
184    }
185
186    /// Gets a reference to the compressor.
187    pub const fn compressor(&self) -> Option<&Compressors> {
188        self.compressor.as_ref()
189    }
190
191    /// Gets a mutable reference to the compressor.
192    pub const fn compressor_mut(&mut self) -> Option<&mut Compressors> {
193        self.compressor.as_mut()
194    }
195
196    /// Loads the file configuration and returns [`Self`].
197    ///
198    /// **The user must ensure the header type matches the one used during the jar's creation.**
199    pub fn load(path: &Path) -> Result<Self, NippyJarError> {
200        // Read [`Self`] located at the data file.
201        let config_path = path.with_extension(CONFIG_FILE_EXTENSION);
202        let config_file = File::open(&config_path)
203            .inspect_err(|e| {
204                warn!( ?path, %e, "Failed to load static file jar");
205            })
206            .map_err(|err| reth_fs_util::FsPathError::open(err, config_path))?;
207
208        let mut obj = Self::load_from_reader(config_file)?;
209        obj.path = path.to_path_buf();
210        Ok(obj)
211    }
212
213    /// Deserializes an instance of [`Self`] from a [`Read`] type.
214    pub fn load_from_reader<R: Read>(reader: R) -> Result<Self, NippyJarError> {
215        Ok(bincode::deserialize_from(reader)?)
216    }
217
218    /// Serializes an instance of [`Self`] to a [`Write`] type.
219    pub fn save_to_writer<W: Write>(&self, writer: W) -> Result<(), NippyJarError> {
220        Ok(bincode::serialize_into(writer, self)?)
221    }
222
223    /// Returns the path for the data file
224    pub fn data_path(&self) -> &Path {
225        self.path.as_ref()
226    }
227
228    /// Returns the path for the index file
229    pub fn index_path(&self) -> PathBuf {
230        self.path.with_extension(INDEX_FILE_EXTENSION)
231    }
232
233    /// Returns the path for the offsets file
234    pub fn offsets_path(&self) -> PathBuf {
235        self.path.with_extension(OFFSETS_FILE_EXTENSION)
236    }
237
238    /// Returns the path for the config file
239    pub fn config_path(&self) -> PathBuf {
240        self.path.with_extension(CONFIG_FILE_EXTENSION)
241    }
242
243    /// Deletes from disk this [`NippyJar`] alongside every satellite file.
244    pub fn delete(self) -> Result<(), NippyJarError> {
245        // TODO(joshie): ensure consistency on unexpected shutdown
246
247        for path in
248            [self.data_path().into(), self.index_path(), self.offsets_path(), self.config_path()]
249        {
250            if path.exists() {
251                debug!(target: "nippy-jar", ?path, "Removing file.");
252                reth_fs_util::remove_file(path)?;
253            }
254        }
255
256        Ok(())
257    }
258
259    /// Returns a [`DataReader`] of the data and offset file
260    pub fn open_data_reader(&self) -> Result<DataReader, NippyJarError> {
261        DataReader::new(self.data_path())
262    }
263
264    /// Writes all necessary configuration to file.
265    fn freeze_config(&self) -> Result<(), NippyJarError> {
266        Ok(reth_fs_util::atomic_write_file(&self.config_path(), |file| self.save_to_writer(file))?)
267    }
268}
269
270#[cfg(test)]
271impl<H: NippyJarHeader> NippyJar<H> {
272    /// If required, prepares any compression algorithm to an early pass of the data.
273    pub fn prepare_compression(
274        &mut self,
275        columns: Vec<impl IntoIterator<Item = Vec<u8>>>,
276    ) -> Result<(), NippyJarError> {
277        // Makes any necessary preparations for the compressors
278        if let Some(compression) = &mut self.compressor {
279            debug!(target: "nippy-jar", columns=columns.len(), "Preparing compression.");
280            compression.prepare_compression(columns)?;
281        }
282        Ok(())
283    }
284
285    /// Writes all data and configuration to a file and the offset index to another.
286    pub fn freeze(
287        self,
288        columns: Vec<impl IntoIterator<Item = ColumnResult<Vec<u8>>>>,
289        total_rows: u64,
290    ) -> Result<Self, NippyJarError> {
291        self.check_before_freeze(&columns)?;
292
293        debug!(target: "nippy-jar", path=?self.data_path(), "Opening data file.");
294
295        // Creates the writer, data and offsets file
296        let mut writer = NippyJarWriter::new(self)?;
297
298        // Append rows to file while holding offsets in memory
299        writer.append_rows(columns, total_rows)?;
300
301        // Flushes configuration and offsets to disk
302        writer.commit()?;
303
304        debug!(target: "nippy-jar", ?writer, "Finished writing data.");
305
306        Ok(writer.into_jar())
307    }
308
309    /// Safety checks before creating and returning a [`File`] handle to write data to.
310    fn check_before_freeze(
311        &self,
312        columns: &[impl IntoIterator<Item = ColumnResult<Vec<u8>>>],
313    ) -> Result<(), NippyJarError> {
314        if columns.len() != self.columns {
315            return Err(NippyJarError::ColumnLenMismatch(self.columns, columns.len()))
316        }
317
318        if let Some(compression) = &self.compressor &&
319            !compression.is_ready()
320        {
321            return Err(NippyJarError::CompressorNotReady)
322        }
323
324        Ok(())
325    }
326}
327
328/// Manages the reading of static file data using memory-mapped files.
329///
330/// Holds file and mmap descriptors of the data and offsets files of a `static_file`.
331#[derive(Debug)]
332pub struct DataReader {
333    /// Data file descriptor. Needs to be kept alive as long as `data_mmap` handle.
334    #[expect(dead_code)]
335    data_file: File,
336    /// Mmap handle for data.
337    data_mmap: Mmap,
338    /// Offset file descriptor. Needs to be kept alive as long as `offset_mmap` handle.
339    offset_file: File,
340    /// Mmap handle for offsets.
341    offset_mmap: Mmap,
342    /// Number of bytes that represent one offset.
343    offset_size: u8,
344}
345
346impl DataReader {
347    /// Reads the respective data and offsets file and returns [`DataReader`].
348    pub fn new(path: impl AsRef<Path>) -> Result<Self, NippyJarError> {
349        let data_file = File::open(path.as_ref())?;
350        // SAFETY: File is read-only and its descriptor is kept alive as long as the mmap handle.
351        let data_mmap = unsafe { Mmap::map(&data_file)? };
352
353        let offset_file = File::open(path.as_ref().with_extension(OFFSETS_FILE_EXTENSION))?;
354        // SAFETY: File is read-only and its descriptor is kept alive as long as the mmap handle.
355        let offset_mmap = unsafe { Mmap::map(&offset_file)? };
356
357        // First byte is the size of one offset in bytes
358        let offset_size = offset_mmap[0];
359
360        // Ensure that the size of an offset is at most 8 bytes.
361        if offset_size > 8 {
362            return Err(NippyJarError::OffsetSizeTooBig { offset_size })
363        } else if offset_size == 0 {
364            return Err(NippyJarError::OffsetSizeTooSmall { offset_size })
365        }
366
367        Ok(Self { data_file, data_mmap, offset_file, offset_size, offset_mmap })
368    }
369
370    /// Returns the offset for the requested data index
371    pub fn offset(&self, index: usize) -> Result<u64, NippyJarError> {
372        // + 1 represents the offset_len u8 which is in the beginning of the file
373        let from = index * self.offset_size as usize + 1;
374
375        self.offset_at(from)
376    }
377
378    /// Returns the offset for the requested data index starting from the end
379    pub fn reverse_offset(&self, index: usize) -> Result<u64, NippyJarError> {
380        let offsets_file_size = self.offset_file.metadata()?.len() as usize;
381
382        if offsets_file_size > 1 {
383            let from = offsets_file_size - self.offset_size as usize * (index + 1);
384
385            self.offset_at(from)
386        } else {
387            Ok(0)
388        }
389    }
390
391    /// Returns total number of offsets in the file.
392    /// The size of one offset is determined by the file itself.
393    pub fn offsets_count(&self) -> Result<usize, NippyJarError> {
394        Ok((self.offset_file.metadata()?.len().saturating_sub(1) / self.offset_size as u64)
395            as usize)
396    }
397
398    /// Reads one offset-sized (determined by the offset file) u64 at the provided index.
399    fn offset_at(&self, index: usize) -> Result<u64, NippyJarError> {
400        let mut buffer: [u8; 8] = [0; 8];
401
402        let offset_end = index.saturating_add(self.offset_size as usize);
403        if offset_end > self.offset_mmap.len() {
404            return Err(NippyJarError::OffsetOutOfBounds { index })
405        }
406
407        buffer[..self.offset_size as usize].copy_from_slice(&self.offset_mmap[index..offset_end]);
408        Ok(u64::from_le_bytes(buffer))
409    }
410
411    /// Returns number of bytes that represent one offset.
412    pub const fn offset_size(&self) -> u8 {
413        self.offset_size
414    }
415
416    /// Returns the underlying data as a slice of bytes for the provided range.
417    pub fn data(&self, range: Range<usize>) -> &[u8] {
418        &self.data_mmap[range]
419    }
420
421    /// Returns total size of data
422    pub fn size(&self) -> usize {
423        self.data_mmap.len()
424    }
425}
426
427#[cfg(test)]
428mod tests {
429    use super::*;
430    use compression::Compression;
431    use rand::{rngs::SmallRng, seq::SliceRandom, RngCore, SeedableRng};
432    use std::{fs::OpenOptions, io::Read};
433
434    type ColumnResults<T> = Vec<ColumnResult<T>>;
435    type ColumnValues = Vec<Vec<u8>>;
436
437    fn test_data(seed: Option<u64>) -> (ColumnValues, ColumnValues) {
438        let value_length = 32;
439        let num_rows = 100;
440
441        let mut vec: Vec<u8> = vec![0; value_length];
442        let mut rng = seed.map(SmallRng::seed_from_u64).unwrap_or_else(SmallRng::from_os_rng);
443
444        let mut entry_gen = || {
445            (0..num_rows)
446                .map(|_| {
447                    rng.fill_bytes(&mut vec[..]);
448                    vec.clone()
449                })
450                .collect()
451        };
452
453        (entry_gen(), entry_gen())
454    }
455
456    fn clone_with_result(col: &ColumnValues) -> ColumnResults<Vec<u8>> {
457        col.iter().map(|v| Ok(v.clone())).collect()
458    }
459
460    #[test]
461    fn test_config_serialization() {
462        let file = tempfile::NamedTempFile::new().unwrap();
463        let jar = NippyJar::new_without_header(23, file.path()).with_lz4();
464        jar.freeze_config().unwrap();
465
466        let mut config_file = OpenOptions::new().read(true).open(jar.config_path()).unwrap();
467        let config_file_len = config_file.metadata().unwrap().len();
468        assert_eq!(config_file_len, 37);
469
470        let mut buf = Vec::with_capacity(config_file_len as usize);
471        config_file.read_to_end(&mut buf).unwrap();
472
473        assert_eq!(
474            vec![
475                1, 0, 0, 0, 0, 0, 0, 0, 23, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0,
476                0, 0, 0, 0, 0, 0, 0, 0, 0, 0
477            ],
478            buf
479        );
480
481        let mut read_jar = bincode::deserialize_from::<_, NippyJar>(&buf[..]).unwrap();
482        // Path is not ser/de
483        read_jar.path = file.path().to_path_buf();
484        assert_eq!(jar, read_jar);
485    }
486
487    #[test]
488    fn test_zstd_with_dictionaries() {
489        let (col1, col2) = test_data(None);
490        let num_rows = col1.len() as u64;
491        let num_columns = 2;
492        let file_path = tempfile::NamedTempFile::new().unwrap();
493
494        let nippy = NippyJar::new_without_header(num_columns, file_path.path());
495        assert!(nippy.compressor().is_none());
496
497        let mut nippy =
498            NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(true, 5000);
499        assert!(nippy.compressor().is_some());
500
501        if let Some(Compressors::Zstd(zstd)) = &mut nippy.compressor_mut() {
502            assert!(matches!(zstd.compressors(), Err(NippyJarError::CompressorNotReady)));
503
504            // Make sure the number of column iterators match the initial set up ones.
505            assert!(matches!(
506                zstd.prepare_compression(vec![col1.clone(), col2.clone(), col2.clone()]),
507                Err(NippyJarError::ColumnLenMismatch(columns, 3)) if columns == num_columns
508            ));
509        }
510
511        // If ZSTD is enabled, do not write to the file unless the column dictionaries have been
512        // calculated.
513        assert!(matches!(
514            nippy.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows),
515            Err(NippyJarError::CompressorNotReady)
516        ));
517
518        let mut nippy =
519            NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(true, 5000);
520        assert!(nippy.compressor().is_some());
521
522        nippy.prepare_compression(vec![col1.clone(), col2.clone()]).unwrap();
523
524        if let Some(Compressors::Zstd(zstd)) = &nippy.compressor() {
525            assert!(matches!(
526                (&zstd.state, zstd.dictionaries.as_ref().map(|dict| dict.len())),
527                (compression::ZstdState::Ready, Some(columns)) if columns == num_columns
528            ));
529        }
530
531        let nippy = nippy
532            .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
533            .unwrap();
534
535        let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
536        assert_eq!(nippy.version, loaded_nippy.version);
537        assert_eq!(nippy.columns, loaded_nippy.columns);
538        assert_eq!(nippy.filter, loaded_nippy.filter);
539        assert_eq!(nippy.phf, loaded_nippy.phf);
540        assert_eq!(nippy.max_row_size, loaded_nippy.max_row_size);
541        assert_eq!(nippy.path, loaded_nippy.path);
542
543        if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() {
544            assert!(zstd.use_dict);
545            let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
546
547            // Iterate over compressed values and compare
548            let mut row_index = 0usize;
549            while let Some(row) = cursor.next_row().unwrap() {
550                assert_eq!(
551                    (row[0], row[1]),
552                    (col1[row_index].as_slice(), col2[row_index].as_slice())
553                );
554                row_index += 1;
555            }
556        } else {
557            panic!("Expected Zstd compressor")
558        }
559    }
560
561    #[test]
562    fn test_lz4() {
563        let (col1, col2) = test_data(None);
564        let num_rows = col1.len() as u64;
565        let num_columns = 2;
566        let file_path = tempfile::NamedTempFile::new().unwrap();
567
568        let nippy = NippyJar::new_without_header(num_columns, file_path.path());
569        assert!(nippy.compressor().is_none());
570
571        let nippy = NippyJar::new_without_header(num_columns, file_path.path()).with_lz4();
572        assert!(nippy.compressor().is_some());
573
574        let nippy = nippy
575            .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
576            .unwrap();
577
578        let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
579        assert_eq!(nippy, loaded_nippy);
580
581        if let Some(Compressors::Lz4(_)) = loaded_nippy.compressor() {
582            let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
583
584            // Iterate over compressed values and compare
585            let mut row_index = 0usize;
586            while let Some(row) = cursor.next_row().unwrap() {
587                assert_eq!(
588                    (row[0], row[1]),
589                    (col1[row_index].as_slice(), col2[row_index].as_slice())
590                );
591                row_index += 1;
592            }
593        } else {
594            panic!("Expected Lz4 compressor")
595        }
596    }
597
598    #[test]
599    fn test_zstd_no_dictionaries() {
600        let (col1, col2) = test_data(None);
601        let num_rows = col1.len() as u64;
602        let num_columns = 2;
603        let file_path = tempfile::NamedTempFile::new().unwrap();
604
605        let nippy = NippyJar::new_without_header(num_columns, file_path.path());
606        assert!(nippy.compressor().is_none());
607
608        let nippy =
609            NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(false, 5000);
610        assert!(nippy.compressor().is_some());
611
612        let nippy = nippy
613            .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
614            .unwrap();
615
616        let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
617        assert_eq!(nippy, loaded_nippy);
618
619        if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() {
620            assert!(!zstd.use_dict);
621
622            let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
623
624            // Iterate over compressed values and compare
625            let mut row_index = 0usize;
626            while let Some(row) = cursor.next_row().unwrap() {
627                assert_eq!(
628                    (row[0], row[1]),
629                    (col1[row_index].as_slice(), col2[row_index].as_slice())
630                );
631                row_index += 1;
632            }
633        } else {
634            panic!("Expected Zstd compressor")
635        }
636    }
637
638    /// Tests `NippyJar` with everything enabled.
639    #[test]
640    fn test_full_nippy_jar() {
641        let (col1, col2) = test_data(None);
642        let num_rows = col1.len() as u64;
643        let num_columns = 2;
644        let file_path = tempfile::NamedTempFile::new().unwrap();
645        let data = vec![col1.clone(), col2.clone()];
646
647        let block_start = 500;
648
649        #[derive(Serialize, Deserialize, Debug)]
650        struct BlockJarHeader {
651            block_start: usize,
652        }
653
654        // Create file
655        {
656            let mut nippy =
657                NippyJar::new(num_columns, file_path.path(), BlockJarHeader { block_start })
658                    .with_zstd(true, 5000);
659
660            nippy.prepare_compression(data.clone()).unwrap();
661            nippy
662                .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
663                .unwrap();
664        }
665
666        // Read file
667        {
668            let loaded_nippy = NippyJar::<BlockJarHeader>::load(file_path.path()).unwrap();
669
670            assert!(loaded_nippy.compressor().is_some());
671            assert_eq!(loaded_nippy.user_header().block_start, block_start);
672
673            if let Some(Compressors::Zstd(_zstd)) = loaded_nippy.compressor() {
674                let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
675
676                // Iterate over compressed values and compare
677                let mut row_num = 0usize;
678                while let Some(row) = cursor.next_row().unwrap() {
679                    assert_eq!(
680                        (row[0], row[1]),
681                        (data[0][row_num].as_slice(), data[1][row_num].as_slice())
682                    );
683                    row_num += 1;
684                }
685
686                // Shuffled for chaos.
687                let mut data = col1.iter().zip(col2.iter()).enumerate().collect::<Vec<_>>();
688                data.shuffle(&mut rand::rng());
689
690                for (row_num, (v0, v1)) in data {
691                    // Simulates `by_number` queries
692                    let row_by_num = cursor.row_by_number(row_num).unwrap().unwrap();
693                    assert_eq!((&row_by_num[0].to_vec(), &row_by_num[1].to_vec()), (v0, v1));
694                }
695            }
696        }
697    }
698
699    #[test]
700    fn test_selectable_column_values() {
701        let (col1, col2) = test_data(None);
702        let num_rows = col1.len() as u64;
703        let num_columns = 2;
704        let file_path = tempfile::NamedTempFile::new().unwrap();
705        let data = vec![col1.clone(), col2.clone()];
706
707        // Create file
708        {
709            let mut nippy =
710                NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(true, 5000);
711            nippy.prepare_compression(data).unwrap();
712            nippy
713                .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
714                .unwrap();
715        }
716
717        // Read file
718        {
719            let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
720
721            if let Some(Compressors::Zstd(_zstd)) = loaded_nippy.compressor() {
722                let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
723
724                // Shuffled for chaos.
725                let mut data = col1.iter().zip(col2.iter()).enumerate().collect::<Vec<_>>();
726                data.shuffle(&mut rand::rng());
727
728                // Imagine `Blocks` static file has two columns: `Block | StoredWithdrawals`
729                const BLOCKS_FULL_MASK: usize = 0b11;
730
731                // Read both columns
732                for (row_num, (v0, v1)) in &data {
733                    // Simulates `by_number` queries
734                    let row_by_num = cursor
735                        .row_by_number_with_cols(*row_num, BLOCKS_FULL_MASK)
736                        .unwrap()
737                        .unwrap();
738                    assert_eq!((&row_by_num[0].to_vec(), &row_by_num[1].to_vec()), (*v0, *v1));
739                }
740
741                // Read first column only: `Block`
742                const BLOCKS_BLOCK_MASK: usize = 0b01;
743                for (row_num, (v0, _)) in &data {
744                    // Simulates `by_number` queries
745                    let row_by_num = cursor
746                        .row_by_number_with_cols(*row_num, BLOCKS_BLOCK_MASK)
747                        .unwrap()
748                        .unwrap();
749                    assert_eq!(row_by_num.len(), 1);
750                    assert_eq!(&row_by_num[0].to_vec(), *v0);
751                }
752
753                // Read second column only: `Block`
754                const BLOCKS_WITHDRAWAL_MASK: usize = 0b10;
755                for (row_num, (_, v1)) in &data {
756                    // Simulates `by_number` queries
757                    let row_by_num = cursor
758                        .row_by_number_with_cols(*row_num, BLOCKS_WITHDRAWAL_MASK)
759                        .unwrap()
760                        .unwrap();
761                    assert_eq!(row_by_num.len(), 1);
762                    assert_eq!(&row_by_num[0].to_vec(), *v1);
763                }
764
765                // Read nothing
766                const BLOCKS_EMPTY_MASK: usize = 0b00;
767                for (row_num, _) in &data {
768                    // Simulates `by_number` queries
769                    assert!(cursor
770                        .row_by_number_with_cols(*row_num, BLOCKS_EMPTY_MASK)
771                        .unwrap()
772                        .unwrap()
773                        .is_empty());
774                }
775            }
776        }
777    }
778
779    #[test]
780    fn test_writer() {
781        let (col1, col2) = test_data(None);
782        let num_columns = 2;
783        let file_path = tempfile::NamedTempFile::new().unwrap();
784
785        append_two_rows(num_columns, file_path.path(), &col1, &col2);
786
787        // Appends a third row and prunes two rows, to make sure we prune from memory and disk
788        // offset list
789        prune_rows(num_columns, file_path.path(), &col1, &col2);
790
791        // Should be able to append new rows
792        append_two_rows(num_columns, file_path.path(), &col1, &col2);
793
794        // Simulate an unexpected shutdown before there's a chance to commit, and see that it
795        // unwinds successfully
796        test_append_consistency_no_commit(file_path.path(), &col1, &col2);
797
798        // Simulate an unexpected shutdown during commit, and see that it unwinds successfully
799        test_append_consistency_partial_commit(file_path.path(), &col1, &col2);
800    }
801
802    #[test]
803    fn test_pruner() {
804        let (col1, col2) = test_data(None);
805        let num_columns = 2;
806        let num_rows = 2;
807
808        // (missing_offsets, expected number of rows)
809        // If a row wasn't fully pruned, then it should clear it up as well
810        let missing_offsets_scenarios = [(1, 1), (2, 1), (3, 0)];
811
812        for (missing_offsets, expected_rows) in missing_offsets_scenarios {
813            let file_path = tempfile::NamedTempFile::new().unwrap();
814
815            append_two_rows(num_columns, file_path.path(), &col1, &col2);
816
817            simulate_interrupted_prune(num_columns, file_path.path(), num_rows, missing_offsets);
818
819            let nippy = NippyJar::load_without_header(file_path.path()).unwrap();
820            assert_eq!(nippy.rows, expected_rows);
821        }
822    }
823
824    fn test_append_consistency_partial_commit(
825        file_path: &Path,
826        col1: &[Vec<u8>],
827        col2: &[Vec<u8>],
828    ) {
829        let nippy = NippyJar::load_without_header(file_path).unwrap();
830
831        // Set the baseline that should be unwinded to
832        let initial_rows = nippy.rows;
833        let initial_data_size =
834            File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
835        let initial_offset_size =
836            File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize;
837        assert!(initial_data_size > 0);
838        assert!(initial_offset_size > 0);
839
840        // Appends a third row
841        let mut writer = NippyJarWriter::new(nippy).unwrap();
842        writer.append_column(Some(Ok(&col1[2]))).unwrap();
843        writer.append_column(Some(Ok(&col2[2]))).unwrap();
844
845        // Makes sure it doesn't write the last one offset (which is the expected file data size)
846        let _ = writer.offsets_mut().pop();
847
848        // `commit_offsets` is not a pub function. we call it here to simulate the shutdown before
849        // it can flush nippy.rows (config) to disk.
850        writer.commit_offsets().unwrap();
851
852        // Simulate an unexpected shutdown of the writer, before it can finish commit()
853        drop(writer);
854
855        let nippy = NippyJar::load_without_header(file_path).unwrap();
856        assert_eq!(initial_rows, nippy.rows);
857
858        // Data was written successfully
859        let new_data_size =
860            File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
861        assert_eq!(new_data_size, initial_data_size + col1[2].len() + col2[2].len());
862
863        // It should be + 16 (two columns were added), but there's a missing one (the one we pop)
864        assert_eq!(
865            initial_offset_size + 8,
866            File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize
867        );
868
869        // Writer will execute a consistency check and verify first that the offset list on disk
870        // doesn't match the nippy.rows, and prune it. Then, it will prune the data file
871        // accordingly as well.
872        let writer = NippyJarWriter::new(nippy).unwrap();
873        assert_eq!(initial_rows, writer.rows());
874        assert_eq!(
875            initial_offset_size,
876            File::open(writer.offsets_path()).unwrap().metadata().unwrap().len() as usize
877        );
878        assert_eq!(
879            initial_data_size,
880            File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize
881        );
882    }
883
884    fn test_append_consistency_no_commit(file_path: &Path, col1: &[Vec<u8>], col2: &[Vec<u8>]) {
885        let nippy = NippyJar::load_without_header(file_path).unwrap();
886
887        // Set the baseline that should be unwinded to
888        let initial_rows = nippy.rows;
889        let initial_data_size =
890            File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
891        let initial_offset_size =
892            File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize;
893        assert!(initial_data_size > 0);
894        assert!(initial_offset_size > 0);
895
896        // Appends a third row, so we have an offset list in memory, which is not flushed to disk,
897        // while the data has been.
898        let mut writer = NippyJarWriter::new(nippy).unwrap();
899        writer.append_column(Some(Ok(&col1[2]))).unwrap();
900        writer.append_column(Some(Ok(&col2[2]))).unwrap();
901
902        // Simulate an unexpected shutdown of the writer, before it can call commit()
903        drop(writer);
904
905        let nippy = NippyJar::load_without_header(file_path).unwrap();
906        assert_eq!(initial_rows, nippy.rows);
907
908        // Data was written successfully
909        let new_data_size =
910            File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
911        assert_eq!(new_data_size, initial_data_size + col1[2].len() + col2[2].len());
912
913        // Since offsets only get written on commit(), this remains the same
914        assert_eq!(
915            initial_offset_size,
916            File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize
917        );
918
919        // Writer will execute a consistency check and verify that the data file has more data than
920        // it should, and resets it to the last offset of the list (on disk here)
921        let writer = NippyJarWriter::new(nippy).unwrap();
922        assert_eq!(initial_rows, writer.rows());
923        assert_eq!(
924            initial_data_size,
925            File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize
926        );
927    }
928
929    fn append_two_rows(num_columns: usize, file_path: &Path, col1: &[Vec<u8>], col2: &[Vec<u8>]) {
930        // Create and add 1 row
931        {
932            let nippy = NippyJar::new_without_header(num_columns, file_path);
933            nippy.freeze_config().unwrap();
934            assert_eq!(nippy.max_row_size, 0);
935            assert_eq!(nippy.rows, 0);
936
937            let mut writer = NippyJarWriter::new(nippy).unwrap();
938            assert_eq!(writer.column(), 0);
939
940            writer.append_column(Some(Ok(&col1[0]))).unwrap();
941            assert_eq!(writer.column(), 1);
942            assert!(writer.is_dirty());
943
944            writer.append_column(Some(Ok(&col2[0]))).unwrap();
945            assert!(writer.is_dirty());
946
947            // Adding last column of a row resets writer and updates jar config
948            assert_eq!(writer.column(), 0);
949
950            // One offset per column + 1 offset at the end representing the expected file data size
951            assert_eq!(writer.offsets().len(), 3);
952            let expected_data_file_size = *writer.offsets().last().unwrap();
953            writer.commit().unwrap();
954            assert!(!writer.is_dirty());
955
956            assert_eq!(writer.max_row_size(), col1[0].len() + col2[0].len());
957            assert_eq!(writer.rows(), 1);
958            assert_eq!(
959                File::open(writer.offsets_path()).unwrap().metadata().unwrap().len(),
960                1 + num_columns as u64 * 8 + 8
961            );
962            assert_eq!(
963                File::open(writer.data_path()).unwrap().metadata().unwrap().len(),
964                expected_data_file_size
965            );
966        }
967
968        // Load and add 1 row
969        {
970            let nippy = NippyJar::load_without_header(file_path).unwrap();
971            // Check if it was committed successfully
972            assert_eq!(nippy.max_row_size, col1[0].len() + col2[0].len());
973            assert_eq!(nippy.rows, 1);
974
975            let mut writer = NippyJarWriter::new(nippy).unwrap();
976            assert_eq!(writer.column(), 0);
977
978            writer.append_column(Some(Ok(&col1[1]))).unwrap();
979            assert_eq!(writer.column(), 1);
980
981            writer.append_column(Some(Ok(&col2[1]))).unwrap();
982
983            // Adding last column of a row resets writer and updates jar config
984            assert_eq!(writer.column(), 0);
985
986            // One offset per column + 1 offset at the end representing the expected file data size
987            assert_eq!(writer.offsets().len(), 3);
988            let expected_data_file_size = *writer.offsets().last().unwrap();
989            writer.commit().unwrap();
990
991            assert_eq!(writer.max_row_size(), col1[0].len() + col2[0].len());
992            assert_eq!(writer.rows(), 2);
993            assert_eq!(
994                File::open(writer.offsets_path()).unwrap().metadata().unwrap().len(),
995                1 + writer.rows() as u64 * num_columns as u64 * 8 + 8
996            );
997            assert_eq!(
998                File::open(writer.data_path()).unwrap().metadata().unwrap().len(),
999                expected_data_file_size
1000            );
1001        }
1002    }
1003
1004    fn prune_rows(num_columns: usize, file_path: &Path, col1: &[Vec<u8>], col2: &[Vec<u8>]) {
1005        let nippy = NippyJar::load_without_header(file_path).unwrap();
1006        let mut writer = NippyJarWriter::new(nippy).unwrap();
1007
1008        // Appends a third row, so we have an offset list in memory, which is not flushed to disk
1009        writer.append_column(Some(Ok(&col1[2]))).unwrap();
1010        writer.append_column(Some(Ok(&col2[2]))).unwrap();
1011        assert!(writer.is_dirty());
1012
1013        // This should prune from the on-memory offset list and ondisk offset list
1014        writer.prune_rows(2).unwrap();
1015        assert_eq!(writer.rows(), 1);
1016
1017        assert_eq!(
1018            File::open(writer.offsets_path()).unwrap().metadata().unwrap().len(),
1019            1 + writer.rows() as u64 * num_columns as u64 * 8 + 8
1020        );
1021
1022        let expected_data_size = col1[0].len() + col2[0].len();
1023        assert_eq!(
1024            File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize,
1025            expected_data_size
1026        );
1027
1028        let nippy = NippyJar::load_without_header(file_path).unwrap();
1029        {
1030            let data_reader = nippy.open_data_reader().unwrap();
1031            // there are only two valid offsets. so index 2 actually represents the expected file
1032            // data size.
1033            assert_eq!(data_reader.offset(2).unwrap(), expected_data_size as u64);
1034        }
1035
1036        // This should prune from the ondisk offset list and clear the jar.
1037        let mut writer = NippyJarWriter::new(nippy).unwrap();
1038        writer.prune_rows(1).unwrap();
1039        assert!(writer.is_dirty());
1040
1041        assert_eq!(writer.rows(), 0);
1042        assert_eq!(writer.max_row_size(), 0);
1043        assert_eq!(File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize, 0);
1044        // Offset size byte (1) + final offset (8) = 9 bytes
1045        assert_eq!(
1046            File::open(writer.offsets_path()).unwrap().metadata().unwrap().len() as usize,
1047            9
1048        );
1049        writer.commit().unwrap();
1050        assert!(!writer.is_dirty());
1051    }
1052
1053    fn simulate_interrupted_prune(
1054        num_columns: usize,
1055        file_path: &Path,
1056        num_rows: u64,
1057        missing_offsets: u64,
1058    ) {
1059        let nippy = NippyJar::load_without_header(file_path).unwrap();
1060        let reader = nippy.open_data_reader().unwrap();
1061        let offsets_file =
1062            OpenOptions::new().read(true).write(true).open(nippy.offsets_path()).unwrap();
1063        let offsets_len = 1 + num_rows * num_columns as u64 * 8 + 8;
1064        assert_eq!(offsets_len, offsets_file.metadata().unwrap().len());
1065
1066        let data_file = OpenOptions::new().read(true).write(true).open(nippy.data_path()).unwrap();
1067        let data_len = reader.reverse_offset(0).unwrap();
1068        assert_eq!(data_len, data_file.metadata().unwrap().len());
1069
1070        // each data column is 32 bytes long
1071        // by deleting from the data file, the `consistency_check` will go through both branches:
1072        //      when the offset list wasn't updated after clearing the data (data_len > last
1073        // offset).      fixing above, will lead to offset count not match the rows (*
1074        // columns) of the configuration file
1075        data_file.set_len(data_len - 32 * missing_offsets).unwrap();
1076
1077        // runs the consistency check.
1078        let _ = NippyJarWriter::new(nippy).unwrap();
1079    }
1080}