reth_nippy_jar/
lib.rs

1//! Immutable data store format.
2//!
3//! *Warning*: The `NippyJar` encoding format and its implementations are
4//! designed for storing and retrieving data internally. They are not hardened
5//! to safely read potentially malicious data.
6
7#![doc(
8    html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
9    html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
10    issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
11)]
12#![cfg_attr(not(test), warn(unused_crate_dependencies))]
13#![cfg_attr(docsrs, feature(doc_cfg))]
14
15use memmap2::Mmap;
16use serde::{Deserialize, Serialize};
17use std::{
18    error::Error as StdError,
19    fs::File,
20    io::{self, Read, Write},
21    ops::Range,
22    path::{Path, PathBuf},
23};
24use tracing::*;
25
26/// Compression algorithms supported by `NippyJar`.
27pub mod compression;
28#[cfg(test)]
29use compression::Compression;
30use compression::Compressors;
31
32/// empty enum for backwards compatibility
33#[derive(Debug, Serialize, Deserialize)]
34#[cfg_attr(test, derive(PartialEq, Eq))]
35pub enum Functions {}
36
37/// empty enum for backwards compatibility
38#[derive(Debug, Serialize, Deserialize)]
39#[cfg_attr(test, derive(PartialEq, Eq))]
40pub enum InclusionFilters {}
41
42mod error;
43pub use error::NippyJarError;
44
45mod cursor;
46pub use cursor::NippyJarCursor;
47
48mod writer;
49pub use writer::NippyJarWriter;
50
51mod consistency;
52pub use consistency::NippyJarChecker;
53
54/// The version number of the Nippy Jar format.
55const NIPPY_JAR_VERSION: usize = 1;
56/// The file extension used for index files.
57const INDEX_FILE_EXTENSION: &str = "idx";
58/// The file extension used for offsets files.
59const OFFSETS_FILE_EXTENSION: &str = "off";
60/// The file extension used for configuration files.
61pub const CONFIG_FILE_EXTENSION: &str = "conf";
62
63/// A [`RefRow`] is a list of column value slices pointing to either an internal buffer or a
64/// memory-mapped file.
65type RefRow<'a> = Vec<&'a [u8]>;
66
67/// Alias type for a column value wrapped in `Result`.
68pub type ColumnResult<T> = Result<T, Box<dyn StdError + Send + Sync>>;
69
70/// A trait for the user-defined header of [`NippyJar`].
71pub trait NippyJarHeader:
72    Send + Sync + Serialize + for<'b> Deserialize<'b> + std::fmt::Debug + 'static
73{
74}
75
76// Blanket implementation for all types that implement the required traits.
77impl<T> NippyJarHeader for T where
78    T: Send + Sync + Serialize + for<'b> Deserialize<'b> + std::fmt::Debug + 'static
79{
80}
81
82/// `NippyJar` is a specialized storage format designed for immutable data.
83///
84/// Data is organized into a columnar format, enabling column-based compression. Data retrieval
85/// entails consulting an offset list and fetching the data from file via `mmap`.
86#[derive(Serialize, Deserialize)]
87#[cfg_attr(test, derive(PartialEq))]
88pub struct NippyJar<H = ()> {
89    /// The version of the `NippyJar` format.
90    version: usize,
91    /// User-defined header data.
92    /// Default: zero-sized unit type: no header data
93    user_header: H,
94    /// Number of data columns in the jar.
95    columns: usize,
96    /// Number of data rows in the jar.
97    rows: usize,
98    /// Optional compression algorithm applied to the data.
99    compressor: Option<Compressors>,
100    #[serde(skip)]
101    /// Optional field for backwards compatibility
102    filter: Option<InclusionFilters>,
103    #[serde(skip)]
104    /// Optional field for backwards compatibility
105    phf: Option<Functions>,
106    /// Maximum uncompressed row size of the set. This will enable decompression without any
107    /// resizing of the output buffer.
108    max_row_size: usize,
109    /// Data path for file. Supporting files will have a format `{path}.{extension}`.
110    #[serde(skip)]
111    path: PathBuf,
112}
113
114impl<H: NippyJarHeader> std::fmt::Debug for NippyJar<H> {
115    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116        f.debug_struct("NippyJar")
117            .field("version", &self.version)
118            .field("user_header", &self.user_header)
119            .field("rows", &self.rows)
120            .field("columns", &self.columns)
121            .field("compressor", &self.compressor)
122            .field("filter", &self.filter)
123            .field("phf", &self.phf)
124            .field("path", &self.path)
125            .field("max_row_size", &self.max_row_size)
126            .finish_non_exhaustive()
127    }
128}
129
130impl NippyJar<()> {
131    /// Creates a new [`NippyJar`] without an user-defined header data.
132    pub fn new_without_header(columns: usize, path: &Path) -> Self {
133        Self::new(columns, path, ())
134    }
135
136    /// Loads the file configuration and returns [`Self`] on a jar without user-defined header data.
137    pub fn load_without_header(path: &Path) -> Result<Self, NippyJarError> {
138        Self::load(path)
139    }
140}
141
142impl<H: NippyJarHeader> NippyJar<H> {
143    /// Creates a new [`NippyJar`] with a user-defined header data.
144    pub fn new(columns: usize, path: &Path, user_header: H) -> Self {
145        Self {
146            version: NIPPY_JAR_VERSION,
147            user_header,
148            columns,
149            rows: 0,
150            max_row_size: 0,
151            compressor: None,
152            filter: None,
153            phf: None,
154            path: path.to_path_buf(),
155        }
156    }
157
158    /// Adds [`compression::Zstd`] compression.
159    pub fn with_zstd(mut self, use_dict: bool, max_dict_size: usize) -> Self {
160        self.compressor =
161            Some(Compressors::Zstd(compression::Zstd::new(use_dict, max_dict_size, self.columns)));
162        self
163    }
164
165    /// Adds [`compression::Lz4`] compression.
166    pub fn with_lz4(mut self) -> Self {
167        self.compressor = Some(Compressors::Lz4(compression::Lz4::default()));
168        self
169    }
170
171    /// Gets a reference to the user header.
172    pub const fn user_header(&self) -> &H {
173        &self.user_header
174    }
175
176    /// Gets total columns in jar.
177    pub const fn columns(&self) -> usize {
178        self.columns
179    }
180
181    /// Gets total rows in jar.
182    pub const fn rows(&self) -> usize {
183        self.rows
184    }
185
186    /// Gets a reference to the compressor.
187    pub const fn compressor(&self) -> Option<&Compressors> {
188        self.compressor.as_ref()
189    }
190
191    /// Gets a mutable reference to the compressor.
192    pub const fn compressor_mut(&mut self) -> Option<&mut Compressors> {
193        self.compressor.as_mut()
194    }
195
196    /// Loads the file configuration and returns [`Self`].
197    ///
198    /// **The user must ensure the header type matches the one used during the jar's creation.**
199    pub fn load(path: &Path) -> Result<Self, NippyJarError> {
200        // Read [`Self`] located at the data file.
201        let config_path = path.with_extension(CONFIG_FILE_EXTENSION);
202        let config_file = File::open(&config_path)
203            .inspect_err(|e| {
204                warn!(?path, %e, "Failed to load static file jar");
205            })
206            .map_err(|err| reth_fs_util::FsPathError::open(err, config_path))?;
207
208        let mut obj = Self::load_from_reader(io::BufReader::new(config_file))?;
209        obj.path = path.to_path_buf();
210        Ok(obj)
211    }
212
213    /// Deserializes an instance of [`Self`] from a [`Read`] type.
214    pub fn load_from_reader<R: Read>(reader: R) -> Result<Self, NippyJarError> {
215        Ok(bincode::deserialize_from(reader)?)
216    }
217
218    /// Serializes an instance of [`Self`] to a [`Write`] type.
219    pub fn save_to_writer<W: Write>(&self, writer: W) -> Result<(), NippyJarError> {
220        Ok(bincode::serialize_into(writer, self)?)
221    }
222
223    /// Returns the path for the data file
224    pub fn data_path(&self) -> &Path {
225        self.path.as_ref()
226    }
227
228    /// Returns the path for the index file
229    pub fn index_path(&self) -> PathBuf {
230        self.path.with_extension(INDEX_FILE_EXTENSION)
231    }
232
233    /// Returns the path for the offsets file
234    pub fn offsets_path(&self) -> PathBuf {
235        self.path.with_extension(OFFSETS_FILE_EXTENSION)
236    }
237
238    /// Returns the path for the config file
239    pub fn config_path(&self) -> PathBuf {
240        self.path.with_extension(CONFIG_FILE_EXTENSION)
241    }
242
243    /// Deletes from disk this [`NippyJar`] alongside every satellite file.
244    pub fn delete(self) -> Result<(), NippyJarError> {
245        // TODO(joshie): ensure consistency on unexpected shutdown
246
247        for path in
248            [self.data_path().into(), self.index_path(), self.offsets_path(), self.config_path()]
249        {
250            if path.exists() {
251                debug!(target: "nippy-jar", ?path, "Removing file.");
252                reth_fs_util::remove_file(path)?;
253            }
254        }
255
256        Ok(())
257    }
258
259    /// Returns a [`DataReader`] of the data and offset file
260    pub fn open_data_reader(&self) -> Result<DataReader, NippyJarError> {
261        DataReader::new(self.data_path())
262    }
263
264    /// Writes all necessary configuration to file.
265    fn freeze_config(&self) -> Result<(), NippyJarError> {
266        Ok(reth_fs_util::atomic_write_file(&self.config_path(), |file| self.save_to_writer(file))?)
267    }
268}
269
270#[cfg(test)]
271impl<H: NippyJarHeader> NippyJar<H> {
272    /// If required, prepares any compression algorithm to an early pass of the data.
273    pub fn prepare_compression(
274        &mut self,
275        columns: Vec<impl IntoIterator<Item = Vec<u8>>>,
276    ) -> Result<(), NippyJarError> {
277        // Makes any necessary preparations for the compressors
278        if let Some(compression) = &mut self.compressor {
279            debug!(target: "nippy-jar", columns=columns.len(), "Preparing compression.");
280            compression.prepare_compression(columns)?;
281        }
282        Ok(())
283    }
284
285    /// Writes all data and configuration to a file and the offset index to another.
286    pub fn freeze(
287        self,
288        columns: Vec<impl IntoIterator<Item = ColumnResult<Vec<u8>>>>,
289        total_rows: u64,
290    ) -> Result<Self, NippyJarError> {
291        self.check_before_freeze(&columns)?;
292
293        debug!(target: "nippy-jar", path=?self.data_path(), "Opening data file.");
294
295        // Creates the writer, data and offsets file
296        let mut writer = NippyJarWriter::new(self)?;
297
298        // Append rows to file while holding offsets in memory
299        writer.append_rows(columns, total_rows)?;
300
301        // Flushes configuration and offsets to disk
302        writer.commit()?;
303
304        debug!(target: "nippy-jar", ?writer, "Finished writing data.");
305
306        Ok(writer.into_jar())
307    }
308
309    /// Safety checks before creating and returning a [`File`] handle to write data to.
310    fn check_before_freeze(
311        &self,
312        columns: &[impl IntoIterator<Item = ColumnResult<Vec<u8>>>],
313    ) -> Result<(), NippyJarError> {
314        if columns.len() != self.columns {
315            return Err(NippyJarError::ColumnLenMismatch(self.columns, columns.len()))
316        }
317
318        if let Some(compression) = &self.compressor &&
319            !compression.is_ready()
320        {
321            return Err(NippyJarError::CompressorNotReady)
322        }
323
324        Ok(())
325    }
326}
327
328/// Manages the reading of static file data using memory-mapped files.
329///
330/// Holds file and mmap descriptors of the data and offsets files of a `static_file`.
331#[derive(Debug)]
332pub struct DataReader {
333    /// Data file descriptor. Needs to be kept alive as long as `data_mmap` handle.
334    #[expect(dead_code)]
335    data_file: File,
336    /// Mmap handle for data.
337    data_mmap: Mmap,
338    /// Offset file descriptor. Needs to be kept alive as long as `offset_mmap` handle.
339    offset_file: File,
340    /// Mmap handle for offsets.
341    offset_mmap: Mmap,
342    /// Number of bytes that represent one offset.
343    offset_size: u8,
344}
345
346impl DataReader {
347    /// Reads the respective data and offsets file and returns [`DataReader`].
348    pub fn new(path: impl AsRef<Path>) -> Result<Self, NippyJarError> {
349        let data_file = File::open(path.as_ref())?;
350        // SAFETY: File is read-only and its descriptor is kept alive as long as the mmap handle.
351        let data_mmap = unsafe { Mmap::map(&data_file)? };
352
353        let offset_file = File::open(path.as_ref().with_extension(OFFSETS_FILE_EXTENSION))?;
354        // SAFETY: File is read-only and its descriptor is kept alive as long as the mmap handle.
355        let offset_mmap = unsafe { Mmap::map(&offset_file)? };
356
357        // First byte is the size of one offset in bytes
358        let offset_size = offset_mmap[0];
359
360        // Ensure that the size of an offset is at most 8 bytes.
361        if offset_size > 8 {
362            return Err(NippyJarError::OffsetSizeTooBig { offset_size })
363        } else if offset_size == 0 {
364            return Err(NippyJarError::OffsetSizeTooSmall { offset_size })
365        }
366
367        Ok(Self { data_file, data_mmap, offset_file, offset_size, offset_mmap })
368    }
369
370    /// Returns the offset for the requested data index
371    pub fn offset(&self, index: usize) -> Result<u64, NippyJarError> {
372        // + 1 represents the offset_len u8 which is in the beginning of the file
373        let from = index * self.offset_size as usize + 1;
374
375        self.offset_at(from)
376    }
377
378    /// Returns the offset for the requested data index starting from the end
379    pub fn reverse_offset(&self, index: usize) -> Result<u64, NippyJarError> {
380        let offsets_file_size = self.offset_file.metadata()?.len() as usize;
381
382        if offsets_file_size > 1 {
383            let from = offsets_file_size - self.offset_size as usize * (index + 1);
384
385            self.offset_at(from)
386        } else {
387            Ok(0)
388        }
389    }
390
391    /// Returns total number of offsets in the file.
392    /// The size of one offset is determined by the file itself.
393    pub fn offsets_count(&self) -> Result<usize, NippyJarError> {
394        Ok((self.offset_file.metadata()?.len().saturating_sub(1) / self.offset_size as u64)
395            as usize)
396    }
397
398    /// Reads one offset-sized (determined by the offset file) u64 at the provided index.
399    fn offset_at(&self, index: usize) -> Result<u64, NippyJarError> {
400        let mut buffer: [u8; 8] = [0; 8];
401
402        let offset_end = index.saturating_add(self.offset_size as usize);
403        if offset_end > self.offset_mmap.len() {
404            return Err(NippyJarError::OffsetOutOfBounds { index })
405        }
406
407        buffer[..self.offset_size as usize].copy_from_slice(&self.offset_mmap[index..offset_end]);
408        Ok(u64::from_le_bytes(buffer))
409    }
410
411    /// Returns number of bytes that represent one offset.
412    pub const fn offset_size(&self) -> u8 {
413        self.offset_size
414    }
415
416    /// Returns the underlying data as a slice of bytes for the provided range.
417    pub fn data(&self, range: Range<usize>) -> &[u8] {
418        &self.data_mmap[range]
419    }
420
421    /// Returns total size of data file.
422    pub fn size(&self) -> usize {
423        self.data_mmap.len()
424    }
425
426    /// Returns total size of offsets file.
427    pub fn offsets_size(&self) -> usize {
428        self.offset_mmap.len()
429    }
430}
431
432#[cfg(test)]
433mod tests {
434    use super::*;
435    use compression::Compression;
436    use rand::{rngs::SmallRng, seq::SliceRandom, RngCore, SeedableRng};
437    use std::{fs::OpenOptions, io::Read};
438
439    type ColumnResults<T> = Vec<ColumnResult<T>>;
440    type ColumnValues = Vec<Vec<u8>>;
441
442    fn test_data(seed: Option<u64>) -> (ColumnValues, ColumnValues) {
443        let value_length = 32;
444        let num_rows = 100;
445
446        let mut vec: Vec<u8> = vec![0; value_length];
447        let mut rng = seed.map(SmallRng::seed_from_u64).unwrap_or_else(SmallRng::from_os_rng);
448
449        let mut entry_gen = || {
450            (0..num_rows)
451                .map(|_| {
452                    rng.fill_bytes(&mut vec[..]);
453                    vec.clone()
454                })
455                .collect()
456        };
457
458        (entry_gen(), entry_gen())
459    }
460
461    fn clone_with_result(col: &ColumnValues) -> ColumnResults<Vec<u8>> {
462        col.iter().map(|v| Ok(v.clone())).collect()
463    }
464
465    #[test]
466    fn test_config_serialization() {
467        let file = tempfile::NamedTempFile::new().unwrap();
468        let jar = NippyJar::new_without_header(23, file.path()).with_lz4();
469        jar.freeze_config().unwrap();
470
471        let mut config_file = OpenOptions::new().read(true).open(jar.config_path()).unwrap();
472        let config_file_len = config_file.metadata().unwrap().len();
473        assert_eq!(config_file_len, 37);
474
475        let mut buf = Vec::with_capacity(config_file_len as usize);
476        config_file.read_to_end(&mut buf).unwrap();
477
478        assert_eq!(
479            vec![
480                1, 0, 0, 0, 0, 0, 0, 0, 23, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0,
481                0, 0, 0, 0, 0, 0, 0, 0, 0, 0
482            ],
483            buf
484        );
485
486        let mut read_jar = bincode::deserialize_from::<_, NippyJar>(&buf[..]).unwrap();
487        // Path is not ser/de
488        read_jar.path = file.path().to_path_buf();
489        assert_eq!(jar, read_jar);
490    }
491
492    #[test]
493    fn test_zstd_with_dictionaries() {
494        let (col1, col2) = test_data(None);
495        let num_rows = col1.len() as u64;
496        let num_columns = 2;
497        let file_path = tempfile::NamedTempFile::new().unwrap();
498
499        let nippy = NippyJar::new_without_header(num_columns, file_path.path());
500        assert!(nippy.compressor().is_none());
501
502        let mut nippy =
503            NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(true, 5000);
504        assert!(nippy.compressor().is_some());
505
506        if let Some(Compressors::Zstd(zstd)) = &mut nippy.compressor_mut() {
507            assert!(matches!(zstd.compressors(), Err(NippyJarError::CompressorNotReady)));
508
509            // Make sure the number of column iterators match the initial set up ones.
510            assert!(matches!(
511                zstd.prepare_compression(vec![col1.clone(), col2.clone(), col2.clone()]),
512                Err(NippyJarError::ColumnLenMismatch(columns, 3)) if columns == num_columns
513            ));
514        }
515
516        // If ZSTD is enabled, do not write to the file unless the column dictionaries have been
517        // calculated.
518        assert!(matches!(
519            nippy.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows),
520            Err(NippyJarError::CompressorNotReady)
521        ));
522
523        let mut nippy =
524            NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(true, 5000);
525        assert!(nippy.compressor().is_some());
526
527        nippy.prepare_compression(vec![col1.clone(), col2.clone()]).unwrap();
528
529        if let Some(Compressors::Zstd(zstd)) = &nippy.compressor() {
530            assert!(matches!(
531                (&zstd.state, zstd.dictionaries.as_ref().map(|dict| dict.len())),
532                (compression::ZstdState::Ready, Some(columns)) if columns == num_columns
533            ));
534        }
535
536        let nippy = nippy
537            .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
538            .unwrap();
539
540        let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
541        assert_eq!(nippy.version, loaded_nippy.version);
542        assert_eq!(nippy.columns, loaded_nippy.columns);
543        assert_eq!(nippy.filter, loaded_nippy.filter);
544        assert_eq!(nippy.phf, loaded_nippy.phf);
545        assert_eq!(nippy.max_row_size, loaded_nippy.max_row_size);
546        assert_eq!(nippy.path, loaded_nippy.path);
547
548        if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() {
549            assert!(zstd.use_dict);
550            let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
551
552            // Iterate over compressed values and compare
553            let mut row_index = 0usize;
554            while let Some(row) = cursor.next_row().unwrap() {
555                assert_eq!(
556                    (row[0], row[1]),
557                    (col1[row_index].as_slice(), col2[row_index].as_slice())
558                );
559                row_index += 1;
560            }
561        } else {
562            panic!("Expected Zstd compressor")
563        }
564    }
565
566    #[test]
567    fn test_lz4() {
568        let (col1, col2) = test_data(None);
569        let num_rows = col1.len() as u64;
570        let num_columns = 2;
571        let file_path = tempfile::NamedTempFile::new().unwrap();
572
573        let nippy = NippyJar::new_without_header(num_columns, file_path.path());
574        assert!(nippy.compressor().is_none());
575
576        let nippy = NippyJar::new_without_header(num_columns, file_path.path()).with_lz4();
577        assert!(nippy.compressor().is_some());
578
579        let nippy = nippy
580            .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
581            .unwrap();
582
583        let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
584        assert_eq!(nippy, loaded_nippy);
585
586        if let Some(Compressors::Lz4(_)) = loaded_nippy.compressor() {
587            let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
588
589            // Iterate over compressed values and compare
590            let mut row_index = 0usize;
591            while let Some(row) = cursor.next_row().unwrap() {
592                assert_eq!(
593                    (row[0], row[1]),
594                    (col1[row_index].as_slice(), col2[row_index].as_slice())
595                );
596                row_index += 1;
597            }
598        } else {
599            panic!("Expected Lz4 compressor")
600        }
601    }
602
603    #[test]
604    fn test_zstd_no_dictionaries() {
605        let (col1, col2) = test_data(None);
606        let num_rows = col1.len() as u64;
607        let num_columns = 2;
608        let file_path = tempfile::NamedTempFile::new().unwrap();
609
610        let nippy = NippyJar::new_without_header(num_columns, file_path.path());
611        assert!(nippy.compressor().is_none());
612
613        let nippy =
614            NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(false, 5000);
615        assert!(nippy.compressor().is_some());
616
617        let nippy = nippy
618            .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
619            .unwrap();
620
621        let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
622        assert_eq!(nippy, loaded_nippy);
623
624        if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() {
625            assert!(!zstd.use_dict);
626
627            let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
628
629            // Iterate over compressed values and compare
630            let mut row_index = 0usize;
631            while let Some(row) = cursor.next_row().unwrap() {
632                assert_eq!(
633                    (row[0], row[1]),
634                    (col1[row_index].as_slice(), col2[row_index].as_slice())
635                );
636                row_index += 1;
637            }
638        } else {
639            panic!("Expected Zstd compressor")
640        }
641    }
642
643    /// Tests `NippyJar` with everything enabled.
644    #[test]
645    fn test_full_nippy_jar() {
646        let (col1, col2) = test_data(None);
647        let num_rows = col1.len() as u64;
648        let num_columns = 2;
649        let file_path = tempfile::NamedTempFile::new().unwrap();
650        let data = vec![col1.clone(), col2.clone()];
651
652        let block_start = 500;
653
654        #[derive(Serialize, Deserialize, Debug)]
655        struct BlockJarHeader {
656            block_start: usize,
657        }
658
659        // Create file
660        {
661            let mut nippy =
662                NippyJar::new(num_columns, file_path.path(), BlockJarHeader { block_start })
663                    .with_zstd(true, 5000);
664
665            nippy.prepare_compression(data.clone()).unwrap();
666            nippy
667                .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
668                .unwrap();
669        }
670
671        // Read file
672        {
673            let loaded_nippy = NippyJar::<BlockJarHeader>::load(file_path.path()).unwrap();
674
675            assert!(loaded_nippy.compressor().is_some());
676            assert_eq!(loaded_nippy.user_header().block_start, block_start);
677
678            if let Some(Compressors::Zstd(_zstd)) = loaded_nippy.compressor() {
679                let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
680
681                // Iterate over compressed values and compare
682                let mut row_num = 0usize;
683                while let Some(row) = cursor.next_row().unwrap() {
684                    assert_eq!(
685                        (row[0], row[1]),
686                        (data[0][row_num].as_slice(), data[1][row_num].as_slice())
687                    );
688                    row_num += 1;
689                }
690
691                // Shuffled for chaos.
692                let mut data = col1.iter().zip(col2.iter()).enumerate().collect::<Vec<_>>();
693                data.shuffle(&mut rand::rng());
694
695                for (row_num, (v0, v1)) in data {
696                    // Simulates `by_number` queries
697                    let row_by_num = cursor.row_by_number(row_num).unwrap().unwrap();
698                    assert_eq!((&row_by_num[0].to_vec(), &row_by_num[1].to_vec()), (v0, v1));
699                }
700            }
701        }
702    }
703
704    #[test]
705    fn test_selectable_column_values() {
706        let (col1, col2) = test_data(None);
707        let num_rows = col1.len() as u64;
708        let num_columns = 2;
709        let file_path = tempfile::NamedTempFile::new().unwrap();
710        let data = vec![col1.clone(), col2.clone()];
711
712        // Create file
713        {
714            let mut nippy =
715                NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(true, 5000);
716            nippy.prepare_compression(data).unwrap();
717            nippy
718                .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
719                .unwrap();
720        }
721
722        // Read file
723        {
724            let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
725
726            if let Some(Compressors::Zstd(_zstd)) = loaded_nippy.compressor() {
727                let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
728
729                // Shuffled for chaos.
730                let mut data = col1.iter().zip(col2.iter()).enumerate().collect::<Vec<_>>();
731                data.shuffle(&mut rand::rng());
732
733                // Imagine `Blocks` static file has two columns: `Block | StoredWithdrawals`
734                const BLOCKS_FULL_MASK: usize = 0b11;
735
736                // Read both columns
737                for (row_num, (v0, v1)) in &data {
738                    // Simulates `by_number` queries
739                    let row_by_num = cursor
740                        .row_by_number_with_cols(*row_num, BLOCKS_FULL_MASK)
741                        .unwrap()
742                        .unwrap();
743                    assert_eq!((&row_by_num[0].to_vec(), &row_by_num[1].to_vec()), (*v0, *v1));
744                }
745
746                // Read first column only: `Block`
747                const BLOCKS_BLOCK_MASK: usize = 0b01;
748                for (row_num, (v0, _)) in &data {
749                    // Simulates `by_number` queries
750                    let row_by_num = cursor
751                        .row_by_number_with_cols(*row_num, BLOCKS_BLOCK_MASK)
752                        .unwrap()
753                        .unwrap();
754                    assert_eq!(row_by_num.len(), 1);
755                    assert_eq!(&row_by_num[0].to_vec(), *v0);
756                }
757
758                // Read second column only: `Block`
759                const BLOCKS_WITHDRAWAL_MASK: usize = 0b10;
760                for (row_num, (_, v1)) in &data {
761                    // Simulates `by_number` queries
762                    let row_by_num = cursor
763                        .row_by_number_with_cols(*row_num, BLOCKS_WITHDRAWAL_MASK)
764                        .unwrap()
765                        .unwrap();
766                    assert_eq!(row_by_num.len(), 1);
767                    assert_eq!(&row_by_num[0].to_vec(), *v1);
768                }
769
770                // Read nothing
771                const BLOCKS_EMPTY_MASK: usize = 0b00;
772                for (row_num, _) in &data {
773                    // Simulates `by_number` queries
774                    assert!(cursor
775                        .row_by_number_with_cols(*row_num, BLOCKS_EMPTY_MASK)
776                        .unwrap()
777                        .unwrap()
778                        .is_empty());
779                }
780            }
781        }
782    }
783
784    #[test]
785    fn test_writer() {
786        let (col1, col2) = test_data(None);
787        let num_columns = 2;
788        let file_path = tempfile::NamedTempFile::new().unwrap();
789
790        append_two_rows(num_columns, file_path.path(), &col1, &col2);
791
792        // Appends a third row and prunes two rows, to make sure we prune from memory and disk
793        // offset list
794        prune_rows(num_columns, file_path.path(), &col1, &col2);
795
796        // Should be able to append new rows
797        append_two_rows(num_columns, file_path.path(), &col1, &col2);
798
799        // Simulate an unexpected shutdown before there's a chance to commit, and see that it
800        // unwinds successfully
801        test_append_consistency_no_commit(file_path.path(), &col1, &col2);
802
803        // Simulate an unexpected shutdown during commit, and see that it unwinds successfully
804        test_append_consistency_partial_commit(file_path.path(), &col1, &col2);
805    }
806
807    #[test]
808    fn test_pruner() {
809        let (col1, col2) = test_data(None);
810        let num_columns = 2;
811        let num_rows = 2;
812
813        // (missing_offsets, expected number of rows)
814        // If a row wasn't fully pruned, then it should clear it up as well
815        let missing_offsets_scenarios = [(1, 1), (2, 1), (3, 0)];
816
817        for (missing_offsets, expected_rows) in missing_offsets_scenarios {
818            let file_path = tempfile::NamedTempFile::new().unwrap();
819
820            append_two_rows(num_columns, file_path.path(), &col1, &col2);
821
822            simulate_interrupted_prune(num_columns, file_path.path(), num_rows, missing_offsets);
823
824            let nippy = NippyJar::load_without_header(file_path.path()).unwrap();
825            assert_eq!(nippy.rows, expected_rows);
826        }
827    }
828
829    fn test_append_consistency_partial_commit(
830        file_path: &Path,
831        col1: &[Vec<u8>],
832        col2: &[Vec<u8>],
833    ) {
834        let nippy = NippyJar::load_without_header(file_path).unwrap();
835
836        // Set the baseline that should be unwinded to
837        let initial_rows = nippy.rows;
838        let initial_data_size =
839            File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
840        let initial_offset_size =
841            File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize;
842        assert!(initial_data_size > 0);
843        assert!(initial_offset_size > 0);
844
845        // Appends a third row
846        let mut writer = NippyJarWriter::new(nippy).unwrap();
847        writer.append_column(Some(Ok(&col1[2]))).unwrap();
848        writer.append_column(Some(Ok(&col2[2]))).unwrap();
849
850        // Makes sure it doesn't write the last one offset (which is the expected file data size)
851        let _ = writer.offsets_mut().pop();
852
853        // `commit_offsets` is not a pub function. we call it here to simulate the shutdown before
854        // it can flush nippy.rows (config) to disk.
855        writer.commit_offsets().unwrap();
856
857        // Simulate an unexpected shutdown of the writer, before it can finish commit()
858        drop(writer);
859
860        let nippy = NippyJar::load_without_header(file_path).unwrap();
861        assert_eq!(initial_rows, nippy.rows);
862
863        // Data was written successfully
864        let new_data_size =
865            File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
866        assert_eq!(new_data_size, initial_data_size + col1[2].len() + col2[2].len());
867
868        // It should be + 16 (two columns were added), but there's a missing one (the one we pop)
869        assert_eq!(
870            initial_offset_size + 8,
871            File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize
872        );
873
874        // Writer will execute a consistency check and verify first that the offset list on disk
875        // doesn't match the nippy.rows, and prune it. Then, it will prune the data file
876        // accordingly as well.
877        let writer = NippyJarWriter::new(nippy).unwrap();
878        assert_eq!(initial_rows, writer.rows());
879        assert_eq!(
880            initial_offset_size,
881            File::open(writer.offsets_path()).unwrap().metadata().unwrap().len() as usize
882        );
883        assert_eq!(
884            initial_data_size,
885            File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize
886        );
887    }
888
889    fn test_append_consistency_no_commit(file_path: &Path, col1: &[Vec<u8>], col2: &[Vec<u8>]) {
890        let nippy = NippyJar::load_without_header(file_path).unwrap();
891
892        // Set the baseline that should be unwinded to
893        let initial_rows = nippy.rows;
894        let initial_data_size =
895            File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
896        let initial_offset_size =
897            File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize;
898        assert!(initial_data_size > 0);
899        assert!(initial_offset_size > 0);
900
901        // Appends a third row, so we have an offset list in memory, which is not flushed to disk,
902        // while the data has been.
903        let mut writer = NippyJarWriter::new(nippy).unwrap();
904        writer.append_column(Some(Ok(&col1[2]))).unwrap();
905        writer.append_column(Some(Ok(&col2[2]))).unwrap();
906
907        // Simulate an unexpected shutdown of the writer, before it can call commit()
908        drop(writer);
909
910        let nippy = NippyJar::load_without_header(file_path).unwrap();
911        assert_eq!(initial_rows, nippy.rows);
912
913        // Data was written successfully
914        let new_data_size =
915            File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
916        assert_eq!(new_data_size, initial_data_size + col1[2].len() + col2[2].len());
917
918        // Since offsets only get written on commit(), this remains the same
919        assert_eq!(
920            initial_offset_size,
921            File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize
922        );
923
924        // Writer will execute a consistency check and verify that the data file has more data than
925        // it should, and resets it to the last offset of the list (on disk here)
926        let writer = NippyJarWriter::new(nippy).unwrap();
927        assert_eq!(initial_rows, writer.rows());
928        assert_eq!(
929            initial_data_size,
930            File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize
931        );
932    }
933
934    fn append_two_rows(num_columns: usize, file_path: &Path, col1: &[Vec<u8>], col2: &[Vec<u8>]) {
935        // Create and add 1 row
936        {
937            let nippy = NippyJar::new_without_header(num_columns, file_path);
938            nippy.freeze_config().unwrap();
939            assert_eq!(nippy.max_row_size, 0);
940            assert_eq!(nippy.rows, 0);
941
942            let mut writer = NippyJarWriter::new(nippy).unwrap();
943            assert_eq!(writer.column(), 0);
944
945            writer.append_column(Some(Ok(&col1[0]))).unwrap();
946            assert_eq!(writer.column(), 1);
947            assert!(writer.is_dirty());
948
949            writer.append_column(Some(Ok(&col2[0]))).unwrap();
950            assert!(writer.is_dirty());
951
952            // Adding last column of a row resets writer and updates jar config
953            assert_eq!(writer.column(), 0);
954
955            // One offset per column + 1 offset at the end representing the expected file data size
956            assert_eq!(writer.offsets().len(), 3);
957            let expected_data_file_size = *writer.offsets().last().unwrap();
958            writer.commit().unwrap();
959            assert!(!writer.is_dirty());
960
961            assert_eq!(writer.max_row_size(), col1[0].len() + col2[0].len());
962            assert_eq!(writer.rows(), 1);
963            assert_eq!(
964                File::open(writer.offsets_path()).unwrap().metadata().unwrap().len(),
965                1 + num_columns as u64 * 8 + 8
966            );
967            assert_eq!(
968                File::open(writer.data_path()).unwrap().metadata().unwrap().len(),
969                expected_data_file_size
970            );
971        }
972
973        // Load and add 1 row
974        {
975            let nippy = NippyJar::load_without_header(file_path).unwrap();
976            // Check if it was committed successfully
977            assert_eq!(nippy.max_row_size, col1[0].len() + col2[0].len());
978            assert_eq!(nippy.rows, 1);
979
980            let mut writer = NippyJarWriter::new(nippy).unwrap();
981            assert_eq!(writer.column(), 0);
982
983            writer.append_column(Some(Ok(&col1[1]))).unwrap();
984            assert_eq!(writer.column(), 1);
985
986            writer.append_column(Some(Ok(&col2[1]))).unwrap();
987
988            // Adding last column of a row resets writer and updates jar config
989            assert_eq!(writer.column(), 0);
990
991            // One offset per column + 1 offset at the end representing the expected file data size
992            assert_eq!(writer.offsets().len(), 3);
993            let expected_data_file_size = *writer.offsets().last().unwrap();
994            writer.commit().unwrap();
995
996            assert_eq!(writer.max_row_size(), col1[0].len() + col2[0].len());
997            assert_eq!(writer.rows(), 2);
998            assert_eq!(
999                File::open(writer.offsets_path()).unwrap().metadata().unwrap().len(),
1000                1 + writer.rows() as u64 * num_columns as u64 * 8 + 8
1001            );
1002            assert_eq!(
1003                File::open(writer.data_path()).unwrap().metadata().unwrap().len(),
1004                expected_data_file_size
1005            );
1006        }
1007    }
1008
1009    fn prune_rows(num_columns: usize, file_path: &Path, col1: &[Vec<u8>], col2: &[Vec<u8>]) {
1010        let nippy = NippyJar::load_without_header(file_path).unwrap();
1011        let mut writer = NippyJarWriter::new(nippy).unwrap();
1012
1013        // Appends a third row, so we have an offset list in memory, which is not flushed to disk
1014        writer.append_column(Some(Ok(&col1[2]))).unwrap();
1015        writer.append_column(Some(Ok(&col2[2]))).unwrap();
1016        assert!(writer.is_dirty());
1017
1018        // This should prune from the on-memory offset list and ondisk offset list
1019        writer.prune_rows(2).unwrap();
1020        assert_eq!(writer.rows(), 1);
1021
1022        assert_eq!(
1023            File::open(writer.offsets_path()).unwrap().metadata().unwrap().len(),
1024            1 + writer.rows() as u64 * num_columns as u64 * 8 + 8
1025        );
1026
1027        let expected_data_size = col1[0].len() + col2[0].len();
1028        assert_eq!(
1029            File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize,
1030            expected_data_size
1031        );
1032
1033        let nippy = NippyJar::load_without_header(file_path).unwrap();
1034        {
1035            let data_reader = nippy.open_data_reader().unwrap();
1036            // there are only two valid offsets. so index 2 actually represents the expected file
1037            // data size.
1038            assert_eq!(data_reader.offset(2).unwrap(), expected_data_size as u64);
1039        }
1040
1041        // This should prune from the ondisk offset list and clear the jar.
1042        let mut writer = NippyJarWriter::new(nippy).unwrap();
1043        writer.prune_rows(1).unwrap();
1044        assert!(writer.is_dirty());
1045
1046        assert_eq!(writer.rows(), 0);
1047        assert_eq!(writer.max_row_size(), 0);
1048        assert_eq!(File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize, 0);
1049        // Offset size byte (1) + final offset (8) = 9 bytes
1050        assert_eq!(
1051            File::open(writer.offsets_path()).unwrap().metadata().unwrap().len() as usize,
1052            9
1053        );
1054        writer.commit().unwrap();
1055        assert!(!writer.is_dirty());
1056    }
1057
1058    fn simulate_interrupted_prune(
1059        num_columns: usize,
1060        file_path: &Path,
1061        num_rows: u64,
1062        missing_offsets: u64,
1063    ) {
1064        let nippy = NippyJar::load_without_header(file_path).unwrap();
1065        let reader = nippy.open_data_reader().unwrap();
1066        let offsets_file =
1067            OpenOptions::new().read(true).write(true).open(nippy.offsets_path()).unwrap();
1068        let offsets_len = 1 + num_rows * num_columns as u64 * 8 + 8;
1069        assert_eq!(offsets_len, offsets_file.metadata().unwrap().len());
1070
1071        let data_file = OpenOptions::new().read(true).write(true).open(nippy.data_path()).unwrap();
1072        let data_len = reader.reverse_offset(0).unwrap();
1073        assert_eq!(data_len, data_file.metadata().unwrap().len());
1074
1075        // each data column is 32 bytes long
1076        // by deleting from the data file, the `consistency_check` will go through both branches:
1077        //      when the offset list wasn't updated after clearing the data (data_len > last
1078        // offset).      fixing above, will lead to offset count not match the rows (*
1079        // columns) of the configuration file
1080        data_file.set_len(data_len - 32 * missing_offsets).unwrap();
1081
1082        // runs the consistency check.
1083        let _ = NippyJarWriter::new(nippy).unwrap();
1084    }
1085}