1#![doc(
8 html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
9 html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
10 issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
11)]
12#![cfg_attr(not(test), warn(unused_crate_dependencies))]
13#![cfg_attr(docsrs, feature(doc_cfg))]
14
15use memmap2::Mmap;
16use serde::{Deserialize, Serialize};
17use std::{
18 error::Error as StdError,
19 fs::File,
20 io::{self, Read, Write},
21 ops::Range,
22 path::{Path, PathBuf},
23};
24use tracing::*;
25
26pub mod compression;
28#[cfg(test)]
29use compression::Compression;
30use compression::Compressors;
31
32#[derive(Debug, Serialize, Deserialize)]
34#[cfg_attr(test, derive(PartialEq, Eq))]
35pub enum Functions {}
36
37#[derive(Debug, Serialize, Deserialize)]
39#[cfg_attr(test, derive(PartialEq, Eq))]
40pub enum InclusionFilters {}
41
42mod error;
43pub use error::NippyJarError;
44
45mod cursor;
46pub use cursor::NippyJarCursor;
47
48mod writer;
49pub use writer::NippyJarWriter;
50
51mod consistency;
52pub use consistency::NippyJarChecker;
53
54const NIPPY_JAR_VERSION: usize = 1;
56const INDEX_FILE_EXTENSION: &str = "idx";
58const OFFSETS_FILE_EXTENSION: &str = "off";
60pub const CONFIG_FILE_EXTENSION: &str = "conf";
62pub const CHANGESET_OFFSETS_FILE_EXTENSION: &str = "csoff";
64
65type RefRow<'a> = Vec<&'a [u8]>;
68
69pub type ColumnResult<T> = Result<T, Box<dyn StdError + Send + Sync>>;
71
72pub trait NippyJarHeader:
74 Send + Sync + Serialize + for<'b> Deserialize<'b> + std::fmt::Debug + 'static
75{
76}
77
78impl<T> NippyJarHeader for T where
80 T: Send + Sync + Serialize + for<'b> Deserialize<'b> + std::fmt::Debug + 'static
81{
82}
83
84#[derive(Serialize, Deserialize)]
89#[cfg_attr(test, derive(PartialEq))]
90pub struct NippyJar<H = ()> {
91 version: usize,
93 user_header: H,
96 columns: usize,
98 rows: usize,
100 compressor: Option<Compressors>,
102 #[serde(skip)]
103 filter: Option<InclusionFilters>,
105 #[serde(skip)]
106 phf: Option<Functions>,
108 max_row_size: usize,
111 #[serde(skip)]
113 path: PathBuf,
114}
115
116impl<H: NippyJarHeader> std::fmt::Debug for NippyJar<H> {
117 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118 f.debug_struct("NippyJar")
119 .field("version", &self.version)
120 .field("user_header", &self.user_header)
121 .field("rows", &self.rows)
122 .field("columns", &self.columns)
123 .field("compressor", &self.compressor)
124 .field("filter", &self.filter)
125 .field("phf", &self.phf)
126 .field("path", &self.path)
127 .field("max_row_size", &self.max_row_size)
128 .finish_non_exhaustive()
129 }
130}
131
132impl NippyJar<()> {
133 pub fn new_without_header(columns: usize, path: &Path) -> Self {
135 Self::new(columns, path, ())
136 }
137
138 pub fn load_without_header(path: &Path) -> Result<Self, NippyJarError> {
140 Self::load(path)
141 }
142}
143
144impl<H: NippyJarHeader> NippyJar<H> {
145 pub fn new(columns: usize, path: &Path, user_header: H) -> Self {
147 Self {
148 version: NIPPY_JAR_VERSION,
149 user_header,
150 columns,
151 rows: 0,
152 max_row_size: 0,
153 compressor: None,
154 filter: None,
155 phf: None,
156 path: path.to_path_buf(),
157 }
158 }
159
160 pub fn with_zstd(mut self, use_dict: bool, max_dict_size: usize) -> Self {
162 self.compressor =
163 Some(Compressors::Zstd(compression::Zstd::new(use_dict, max_dict_size, self.columns)));
164 self
165 }
166
167 pub fn with_lz4(mut self) -> Self {
169 self.compressor = Some(Compressors::Lz4(compression::Lz4::default()));
170 self
171 }
172
173 pub const fn user_header(&self) -> &H {
175 &self.user_header
176 }
177
178 pub const fn columns(&self) -> usize {
180 self.columns
181 }
182
183 pub const fn rows(&self) -> usize {
185 self.rows
186 }
187
188 pub const fn compressor(&self) -> Option<&Compressors> {
190 self.compressor.as_ref()
191 }
192
193 pub const fn compressor_mut(&mut self) -> Option<&mut Compressors> {
195 self.compressor.as_mut()
196 }
197
198 pub fn load(path: &Path) -> Result<Self, NippyJarError> {
202 let config_path = path.with_extension(CONFIG_FILE_EXTENSION);
204 let config_file = File::open(&config_path)
205 .inspect_err(|e| {
206 warn!(?path, %e, "Failed to load static file jar");
207 })
208 .map_err(|err| reth_fs_util::FsPathError::open(err, config_path))?;
209
210 let mut obj = Self::load_from_reader(io::BufReader::new(config_file))?;
211 obj.path = path.to_path_buf();
212 Ok(obj)
213 }
214
215 pub fn load_from_reader<R: Read>(reader: R) -> Result<Self, NippyJarError> {
217 Ok(bincode::deserialize_from(reader)?)
218 }
219
220 pub fn save_to_writer<W: Write>(&self, writer: W) -> Result<(), NippyJarError> {
222 Ok(bincode::serialize_into(writer, self)?)
223 }
224
225 pub fn data_path(&self) -> &Path {
227 self.path.as_ref()
228 }
229
230 pub fn index_path(&self) -> PathBuf {
232 self.path.with_extension(INDEX_FILE_EXTENSION)
233 }
234
235 pub fn offsets_path(&self) -> PathBuf {
237 self.path.with_extension(OFFSETS_FILE_EXTENSION)
238 }
239
240 pub fn config_path(&self) -> PathBuf {
242 self.path.with_extension(CONFIG_FILE_EXTENSION)
243 }
244
245 pub fn changeset_offsets_path(&self) -> PathBuf {
247 self.path.with_extension(CHANGESET_OFFSETS_FILE_EXTENSION)
248 }
249
250 pub fn delete(self) -> Result<(), NippyJarError> {
252 for path in [
255 self.data_path().into(),
256 self.index_path(),
257 self.offsets_path(),
258 self.config_path(),
259 self.changeset_offsets_path(),
260 ] {
261 if path.exists() {
262 debug!(target: "nippy-jar", ?path, "Removing file.");
263 reth_fs_util::remove_file(path)?;
264 }
265 }
266
267 Ok(())
268 }
269
270 pub fn open_data_reader(&self) -> Result<DataReader, NippyJarError> {
272 DataReader::new(self.data_path())
273 }
274
275 fn freeze_config(&self) -> Result<(), NippyJarError> {
277 Ok(reth_fs_util::atomic_write_file(&self.config_path(), |file| self.save_to_writer(file))?)
278 }
279}
280
281#[cfg(test)]
282impl<H: NippyJarHeader> NippyJar<H> {
283 pub fn prepare_compression(
285 &mut self,
286 columns: Vec<impl IntoIterator<Item = Vec<u8>>>,
287 ) -> Result<(), NippyJarError> {
288 if let Some(compression) = &mut self.compressor {
290 debug!(target: "nippy-jar", columns=columns.len(), "Preparing compression.");
291 compression.prepare_compression(columns)?;
292 }
293 Ok(())
294 }
295
296 pub fn freeze(
298 self,
299 columns: Vec<impl IntoIterator<Item = ColumnResult<Vec<u8>>>>,
300 total_rows: u64,
301 ) -> Result<Self, NippyJarError> {
302 self.check_before_freeze(&columns)?;
303
304 debug!(target: "nippy-jar", path=?self.data_path(), "Opening data file.");
305
306 let mut writer = NippyJarWriter::new(self)?;
308
309 writer.append_rows(columns, total_rows)?;
311
312 writer.commit()?;
314
315 debug!(target: "nippy-jar", ?writer, "Finished writing data.");
316
317 Ok(writer.into_jar())
318 }
319
320 fn check_before_freeze(
322 &self,
323 columns: &[impl IntoIterator<Item = ColumnResult<Vec<u8>>>],
324 ) -> Result<(), NippyJarError> {
325 if columns.len() != self.columns {
326 return Err(NippyJarError::ColumnLenMismatch(self.columns, columns.len()))
327 }
328
329 if let Some(compression) = &self.compressor &&
330 !compression.is_ready()
331 {
332 return Err(NippyJarError::CompressorNotReady)
333 }
334
335 Ok(())
336 }
337}
338
339#[derive(Debug)]
343pub struct DataReader {
344 #[expect(dead_code)]
346 data_file: File,
347 data_mmap: Mmap,
349 offset_file: File,
351 offset_mmap: Mmap,
353 offset_size: u8,
355}
356
357impl DataReader {
358 pub fn new(path: impl AsRef<Path>) -> Result<Self, NippyJarError> {
360 let data_file = File::open(path.as_ref())?;
361 let data_mmap = unsafe { Mmap::map(&data_file)? };
363
364 let offset_file = File::open(path.as_ref().with_extension(OFFSETS_FILE_EXTENSION))?;
365 let offset_mmap = unsafe { Mmap::map(&offset_file)? };
367
368 let offset_size = offset_mmap[0];
370
371 if offset_size > 8 {
373 return Err(NippyJarError::OffsetSizeTooBig { offset_size })
374 } else if offset_size == 0 {
375 return Err(NippyJarError::OffsetSizeTooSmall { offset_size })
376 }
377
378 Ok(Self { data_file, data_mmap, offset_file, offset_size, offset_mmap })
379 }
380
381 pub fn offset(&self, index: usize) -> Result<u64, NippyJarError> {
383 let from = index * self.offset_size as usize + 1;
385
386 self.offset_at(from)
387 }
388
389 pub fn reverse_offset(&self, index: usize) -> Result<u64, NippyJarError> {
391 let offsets_file_size = self.offset_file.metadata()?.len() as usize;
392
393 if offsets_file_size > 1 {
394 let from = offsets_file_size - self.offset_size as usize * (index + 1);
395
396 self.offset_at(from)
397 } else {
398 Ok(0)
399 }
400 }
401
402 pub fn offsets_count(&self) -> Result<usize, NippyJarError> {
405 Ok((self.offset_file.metadata()?.len().saturating_sub(1) / self.offset_size as u64)
406 as usize)
407 }
408
409 fn offset_at(&self, index: usize) -> Result<u64, NippyJarError> {
411 let mut buffer: [u8; 8] = [0; 8];
412
413 let offset_end = index.saturating_add(self.offset_size as usize);
414 if offset_end > self.offset_mmap.len() {
415 return Err(NippyJarError::OffsetOutOfBounds { index })
416 }
417
418 buffer[..self.offset_size as usize].copy_from_slice(&self.offset_mmap[index..offset_end]);
419 Ok(u64::from_le_bytes(buffer))
420 }
421
422 pub const fn offset_size(&self) -> u8 {
424 self.offset_size
425 }
426
427 pub fn data(&self, range: Range<usize>) -> &[u8] {
429 &self.data_mmap[range]
430 }
431
432 pub fn size(&self) -> usize {
434 self.data_mmap.len()
435 }
436
437 pub fn offsets_size(&self) -> usize {
439 self.offset_mmap.len()
440 }
441}
442
443#[cfg(test)]
444mod tests {
445 use super::*;
446 use compression::Compression;
447 use rand::{rngs::SmallRng, seq::SliceRandom, RngCore, SeedableRng};
448 use std::{fs::OpenOptions, io::Read};
449
450 type ColumnResults<T> = Vec<ColumnResult<T>>;
451 type ColumnValues = Vec<Vec<u8>>;
452
453 fn test_data(seed: Option<u64>) -> (ColumnValues, ColumnValues) {
454 let value_length = 32;
455 let num_rows = 100;
456
457 let mut vec: Vec<u8> = vec![0; value_length];
458 let mut rng = seed.map(SmallRng::seed_from_u64).unwrap_or_else(SmallRng::from_os_rng);
459
460 let mut entry_gen = || {
461 (0..num_rows)
462 .map(|_| {
463 rng.fill_bytes(&mut vec[..]);
464 vec.clone()
465 })
466 .collect()
467 };
468
469 (entry_gen(), entry_gen())
470 }
471
472 fn clone_with_result(col: &ColumnValues) -> ColumnResults<Vec<u8>> {
473 col.iter().map(|v| Ok(v.clone())).collect()
474 }
475
476 #[test]
477 fn test_config_serialization() {
478 let file = tempfile::NamedTempFile::new().unwrap();
479 let jar = NippyJar::new_without_header(23, file.path()).with_lz4();
480 jar.freeze_config().unwrap();
481
482 let mut config_file = OpenOptions::new().read(true).open(jar.config_path()).unwrap();
483 let config_file_len = config_file.metadata().unwrap().len();
484 assert_eq!(config_file_len, 37);
485
486 let mut buf = Vec::with_capacity(config_file_len as usize);
487 config_file.read_to_end(&mut buf).unwrap();
488
489 assert_eq!(
490 vec![
491 1, 0, 0, 0, 0, 0, 0, 0, 23, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0,
492 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
493 ],
494 buf
495 );
496
497 let mut read_jar = bincode::deserialize_from::<_, NippyJar>(&buf[..]).unwrap();
498 read_jar.path = file.path().to_path_buf();
500 assert_eq!(jar, read_jar);
501 }
502
503 #[test]
504 fn test_zstd_with_dictionaries() {
505 let (col1, col2) = test_data(None);
506 let num_rows = col1.len() as u64;
507 let num_columns = 2;
508 let file_path = tempfile::NamedTempFile::new().unwrap();
509
510 let nippy = NippyJar::new_without_header(num_columns, file_path.path());
511 assert!(nippy.compressor().is_none());
512
513 let mut nippy =
514 NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(true, 5000);
515 assert!(nippy.compressor().is_some());
516
517 if let Some(Compressors::Zstd(zstd)) = &mut nippy.compressor_mut() {
518 assert!(matches!(zstd.compressors(), Err(NippyJarError::CompressorNotReady)));
519
520 assert!(matches!(
522 zstd.prepare_compression(vec![col1.clone(), col2.clone(), col2.clone()]),
523 Err(NippyJarError::ColumnLenMismatch(columns, 3)) if columns == num_columns
524 ));
525 }
526
527 assert!(matches!(
530 nippy.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows),
531 Err(NippyJarError::CompressorNotReady)
532 ));
533
534 let mut nippy =
535 NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(true, 5000);
536 assert!(nippy.compressor().is_some());
537
538 nippy.prepare_compression(vec![col1.clone(), col2.clone()]).unwrap();
539
540 if let Some(Compressors::Zstd(zstd)) = &nippy.compressor() {
541 assert!(matches!(
542 (&zstd.state, zstd.dictionaries.as_ref().map(|dict| dict.len())),
543 (compression::ZstdState::Ready, Some(columns)) if columns == num_columns
544 ));
545 }
546
547 let nippy = nippy
548 .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
549 .unwrap();
550
551 let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
552 assert_eq!(nippy.version, loaded_nippy.version);
553 assert_eq!(nippy.columns, loaded_nippy.columns);
554 assert_eq!(nippy.filter, loaded_nippy.filter);
555 assert_eq!(nippy.phf, loaded_nippy.phf);
556 assert_eq!(nippy.max_row_size, loaded_nippy.max_row_size);
557 assert_eq!(nippy.path, loaded_nippy.path);
558
559 if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() {
560 assert!(zstd.use_dict);
561 let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
562
563 let mut row_index = 0usize;
565 while let Some(row) = cursor.next_row().unwrap() {
566 assert_eq!(
567 (row[0], row[1]),
568 (col1[row_index].as_slice(), col2[row_index].as_slice())
569 );
570 row_index += 1;
571 }
572 } else {
573 panic!("Expected Zstd compressor")
574 }
575 }
576
577 #[test]
578 fn test_lz4() {
579 let (col1, col2) = test_data(None);
580 let num_rows = col1.len() as u64;
581 let num_columns = 2;
582 let file_path = tempfile::NamedTempFile::new().unwrap();
583
584 let nippy = NippyJar::new_without_header(num_columns, file_path.path());
585 assert!(nippy.compressor().is_none());
586
587 let nippy = NippyJar::new_without_header(num_columns, file_path.path()).with_lz4();
588 assert!(nippy.compressor().is_some());
589
590 let nippy = nippy
591 .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
592 .unwrap();
593
594 let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
595 assert_eq!(nippy, loaded_nippy);
596
597 if let Some(Compressors::Lz4(_)) = loaded_nippy.compressor() {
598 let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
599
600 let mut row_index = 0usize;
602 while let Some(row) = cursor.next_row().unwrap() {
603 assert_eq!(
604 (row[0], row[1]),
605 (col1[row_index].as_slice(), col2[row_index].as_slice())
606 );
607 row_index += 1;
608 }
609 } else {
610 panic!("Expected Lz4 compressor")
611 }
612 }
613
614 #[test]
615 fn test_zstd_no_dictionaries() {
616 let (col1, col2) = test_data(None);
617 let num_rows = col1.len() as u64;
618 let num_columns = 2;
619 let file_path = tempfile::NamedTempFile::new().unwrap();
620
621 let nippy = NippyJar::new_without_header(num_columns, file_path.path());
622 assert!(nippy.compressor().is_none());
623
624 let nippy =
625 NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(false, 5000);
626 assert!(nippy.compressor().is_some());
627
628 let nippy = nippy
629 .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
630 .unwrap();
631
632 let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
633 assert_eq!(nippy, loaded_nippy);
634
635 if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() {
636 assert!(!zstd.use_dict);
637
638 let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
639
640 let mut row_index = 0usize;
642 while let Some(row) = cursor.next_row().unwrap() {
643 assert_eq!(
644 (row[0], row[1]),
645 (col1[row_index].as_slice(), col2[row_index].as_slice())
646 );
647 row_index += 1;
648 }
649 } else {
650 panic!("Expected Zstd compressor")
651 }
652 }
653
654 #[test]
656 fn test_full_nippy_jar() {
657 let (col1, col2) = test_data(None);
658 let num_rows = col1.len() as u64;
659 let num_columns = 2;
660 let file_path = tempfile::NamedTempFile::new().unwrap();
661 let data = vec![col1.clone(), col2.clone()];
662
663 let block_start = 500;
664
665 #[derive(Serialize, Deserialize, Debug)]
666 struct BlockJarHeader {
667 block_start: usize,
668 }
669
670 {
672 let mut nippy =
673 NippyJar::new(num_columns, file_path.path(), BlockJarHeader { block_start })
674 .with_zstd(true, 5000);
675
676 nippy.prepare_compression(data.clone()).unwrap();
677 nippy
678 .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
679 .unwrap();
680 }
681
682 {
684 let loaded_nippy = NippyJar::<BlockJarHeader>::load(file_path.path()).unwrap();
685
686 assert!(loaded_nippy.compressor().is_some());
687 assert_eq!(loaded_nippy.user_header().block_start, block_start);
688
689 if let Some(Compressors::Zstd(_zstd)) = loaded_nippy.compressor() {
690 let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
691
692 let mut row_num = 0usize;
694 while let Some(row) = cursor.next_row().unwrap() {
695 assert_eq!(
696 (row[0], row[1]),
697 (data[0][row_num].as_slice(), data[1][row_num].as_slice())
698 );
699 row_num += 1;
700 }
701
702 let mut data = col1.iter().zip(col2.iter()).enumerate().collect::<Vec<_>>();
704 data.shuffle(&mut rand::rng());
705
706 for (row_num, (v0, v1)) in data {
707 let row_by_num = cursor.row_by_number(row_num).unwrap().unwrap();
709 assert_eq!((&row_by_num[0].to_vec(), &row_by_num[1].to_vec()), (v0, v1));
710 }
711 }
712 }
713 }
714
715 #[test]
716 fn test_selectable_column_values() {
717 let (col1, col2) = test_data(None);
718 let num_rows = col1.len() as u64;
719 let num_columns = 2;
720 let file_path = tempfile::NamedTempFile::new().unwrap();
721 let data = vec![col1.clone(), col2.clone()];
722
723 {
725 let mut nippy =
726 NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(true, 5000);
727 nippy.prepare_compression(data).unwrap();
728 nippy
729 .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
730 .unwrap();
731 }
732
733 {
735 let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
736
737 if let Some(Compressors::Zstd(_zstd)) = loaded_nippy.compressor() {
738 let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
739
740 let mut data = col1.iter().zip(col2.iter()).enumerate().collect::<Vec<_>>();
742 data.shuffle(&mut rand::rng());
743
744 const BLOCKS_FULL_MASK: usize = 0b11;
746
747 for (row_num, (v0, v1)) in &data {
749 let row_by_num = cursor
751 .row_by_number_with_cols(*row_num, BLOCKS_FULL_MASK)
752 .unwrap()
753 .unwrap();
754 assert_eq!((&row_by_num[0].to_vec(), &row_by_num[1].to_vec()), (*v0, *v1));
755 }
756
757 const BLOCKS_BLOCK_MASK: usize = 0b01;
759 for (row_num, (v0, _)) in &data {
760 let row_by_num = cursor
762 .row_by_number_with_cols(*row_num, BLOCKS_BLOCK_MASK)
763 .unwrap()
764 .unwrap();
765 assert_eq!(row_by_num.len(), 1);
766 assert_eq!(&row_by_num[0].to_vec(), *v0);
767 }
768
769 const BLOCKS_WITHDRAWAL_MASK: usize = 0b10;
771 for (row_num, (_, v1)) in &data {
772 let row_by_num = cursor
774 .row_by_number_with_cols(*row_num, BLOCKS_WITHDRAWAL_MASK)
775 .unwrap()
776 .unwrap();
777 assert_eq!(row_by_num.len(), 1);
778 assert_eq!(&row_by_num[0].to_vec(), *v1);
779 }
780
781 const BLOCKS_EMPTY_MASK: usize = 0b00;
783 for (row_num, _) in &data {
784 assert!(cursor
786 .row_by_number_with_cols(*row_num, BLOCKS_EMPTY_MASK)
787 .unwrap()
788 .unwrap()
789 .is_empty());
790 }
791 }
792 }
793 }
794
795 #[test]
796 fn test_writer() {
797 let (col1, col2) = test_data(None);
798 let num_columns = 2;
799 let file_path = tempfile::NamedTempFile::new().unwrap();
800
801 append_two_rows(num_columns, file_path.path(), &col1, &col2);
802
803 prune_rows(num_columns, file_path.path(), &col1, &col2);
806
807 append_two_rows(num_columns, file_path.path(), &col1, &col2);
809
810 test_append_consistency_no_commit(file_path.path(), &col1, &col2);
813
814 test_append_consistency_partial_commit(file_path.path(), &col1, &col2);
816 }
817
818 #[test]
819 fn test_pruner() {
820 let (col1, col2) = test_data(None);
821 let num_columns = 2;
822 let num_rows = 2;
823
824 let missing_offsets_scenarios = [(1, 1), (2, 1), (3, 0)];
827
828 for (missing_offsets, expected_rows) in missing_offsets_scenarios {
829 let file_path = tempfile::NamedTempFile::new().unwrap();
830
831 append_two_rows(num_columns, file_path.path(), &col1, &col2);
832
833 simulate_interrupted_prune(num_columns, file_path.path(), num_rows, missing_offsets);
834
835 let nippy = NippyJar::load_without_header(file_path.path()).unwrap();
836 assert_eq!(nippy.rows, expected_rows);
837 }
838 }
839
840 fn test_append_consistency_partial_commit(
841 file_path: &Path,
842 col1: &[Vec<u8>],
843 col2: &[Vec<u8>],
844 ) {
845 let nippy = NippyJar::load_without_header(file_path).unwrap();
846
847 let initial_rows = nippy.rows;
849 let initial_data_size =
850 File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
851 let initial_offset_size =
852 File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize;
853 assert!(initial_data_size > 0);
854 assert!(initial_offset_size > 0);
855
856 let mut writer = NippyJarWriter::new(nippy).unwrap();
858 writer.append_column(Some(Ok(&col1[2]))).unwrap();
859 writer.append_column(Some(Ok(&col2[2]))).unwrap();
860
861 let _ = writer.offsets_mut().pop();
863
864 writer.commit_offsets().unwrap();
867
868 drop(writer);
870
871 let nippy = NippyJar::load_without_header(file_path).unwrap();
872 assert_eq!(initial_rows, nippy.rows);
873
874 let new_data_size =
876 File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
877 assert_eq!(new_data_size, initial_data_size + col1[2].len() + col2[2].len());
878
879 assert_eq!(
881 initial_offset_size + 8,
882 File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize
883 );
884
885 let writer = NippyJarWriter::new(nippy).unwrap();
889 assert_eq!(initial_rows, writer.rows());
890 assert_eq!(
891 initial_offset_size,
892 File::open(writer.offsets_path()).unwrap().metadata().unwrap().len() as usize
893 );
894 assert_eq!(
895 initial_data_size,
896 File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize
897 );
898 }
899
900 fn test_append_consistency_no_commit(file_path: &Path, col1: &[Vec<u8>], col2: &[Vec<u8>]) {
901 let nippy = NippyJar::load_without_header(file_path).unwrap();
902
903 let initial_rows = nippy.rows;
905 let initial_data_size =
906 File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
907 let initial_offset_size =
908 File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize;
909 assert!(initial_data_size > 0);
910 assert!(initial_offset_size > 0);
911
912 let mut writer = NippyJarWriter::new(nippy).unwrap();
915 writer.append_column(Some(Ok(&col1[2]))).unwrap();
916 writer.append_column(Some(Ok(&col2[2]))).unwrap();
917
918 drop(writer);
920
921 let nippy = NippyJar::load_without_header(file_path).unwrap();
922 assert_eq!(initial_rows, nippy.rows);
923
924 let new_data_size =
926 File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
927 assert_eq!(new_data_size, initial_data_size + col1[2].len() + col2[2].len());
928
929 assert_eq!(
931 initial_offset_size,
932 File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize
933 );
934
935 let writer = NippyJarWriter::new(nippy).unwrap();
938 assert_eq!(initial_rows, writer.rows());
939 assert_eq!(
940 initial_data_size,
941 File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize
942 );
943 }
944
945 fn append_two_rows(num_columns: usize, file_path: &Path, col1: &[Vec<u8>], col2: &[Vec<u8>]) {
946 {
948 let nippy = NippyJar::new_without_header(num_columns, file_path);
949 nippy.freeze_config().unwrap();
950 assert_eq!(nippy.max_row_size, 0);
951 assert_eq!(nippy.rows, 0);
952
953 let mut writer = NippyJarWriter::new(nippy).unwrap();
954 assert_eq!(writer.column(), 0);
955
956 writer.append_column(Some(Ok(&col1[0]))).unwrap();
957 assert_eq!(writer.column(), 1);
958 assert!(writer.is_dirty());
959
960 writer.append_column(Some(Ok(&col2[0]))).unwrap();
961 assert!(writer.is_dirty());
962
963 assert_eq!(writer.column(), 0);
965
966 assert_eq!(writer.offsets().len(), 3);
968 let expected_data_file_size = *writer.offsets().last().unwrap();
969 writer.commit().unwrap();
970 assert!(!writer.is_dirty());
971
972 assert_eq!(writer.max_row_size(), col1[0].len() + col2[0].len());
973 assert_eq!(writer.rows(), 1);
974 assert_eq!(
975 File::open(writer.offsets_path()).unwrap().metadata().unwrap().len(),
976 1 + num_columns as u64 * 8 + 8
977 );
978 assert_eq!(
979 File::open(writer.data_path()).unwrap().metadata().unwrap().len(),
980 expected_data_file_size
981 );
982 }
983
984 {
986 let nippy = NippyJar::load_without_header(file_path).unwrap();
987 assert_eq!(nippy.max_row_size, col1[0].len() + col2[0].len());
989 assert_eq!(nippy.rows, 1);
990
991 let mut writer = NippyJarWriter::new(nippy).unwrap();
992 assert_eq!(writer.column(), 0);
993
994 writer.append_column(Some(Ok(&col1[1]))).unwrap();
995 assert_eq!(writer.column(), 1);
996
997 writer.append_column(Some(Ok(&col2[1]))).unwrap();
998
999 assert_eq!(writer.column(), 0);
1001
1002 assert_eq!(writer.offsets().len(), 3);
1004 let expected_data_file_size = *writer.offsets().last().unwrap();
1005 writer.commit().unwrap();
1006
1007 assert_eq!(writer.max_row_size(), col1[0].len() + col2[0].len());
1008 assert_eq!(writer.rows(), 2);
1009 assert_eq!(
1010 File::open(writer.offsets_path()).unwrap().metadata().unwrap().len(),
1011 1 + writer.rows() as u64 * num_columns as u64 * 8 + 8
1012 );
1013 assert_eq!(
1014 File::open(writer.data_path()).unwrap().metadata().unwrap().len(),
1015 expected_data_file_size
1016 );
1017 }
1018 }
1019
1020 fn prune_rows(num_columns: usize, file_path: &Path, col1: &[Vec<u8>], col2: &[Vec<u8>]) {
1021 let nippy = NippyJar::load_without_header(file_path).unwrap();
1022 let mut writer = NippyJarWriter::new(nippy).unwrap();
1023
1024 writer.append_column(Some(Ok(&col1[2]))).unwrap();
1026 writer.append_column(Some(Ok(&col2[2]))).unwrap();
1027 assert!(writer.is_dirty());
1028
1029 writer.prune_rows(2).unwrap();
1031 assert_eq!(writer.rows(), 1);
1032
1033 assert_eq!(
1034 File::open(writer.offsets_path()).unwrap().metadata().unwrap().len(),
1035 1 + writer.rows() as u64 * num_columns as u64 * 8 + 8
1036 );
1037
1038 let expected_data_size = col1[0].len() + col2[0].len();
1039 assert_eq!(
1040 File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize,
1041 expected_data_size
1042 );
1043
1044 let nippy = NippyJar::load_without_header(file_path).unwrap();
1045 {
1046 let data_reader = nippy.open_data_reader().unwrap();
1047 assert_eq!(data_reader.offset(2).unwrap(), expected_data_size as u64);
1050 }
1051
1052 let mut writer = NippyJarWriter::new(nippy).unwrap();
1054 writer.prune_rows(1).unwrap();
1055 assert!(writer.is_dirty());
1056
1057 assert_eq!(writer.rows(), 0);
1058 assert_eq!(writer.max_row_size(), 0);
1059 assert_eq!(File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize, 0);
1060 assert_eq!(
1062 File::open(writer.offsets_path()).unwrap().metadata().unwrap().len() as usize,
1063 9
1064 );
1065 writer.commit().unwrap();
1066 assert!(!writer.is_dirty());
1067 }
1068
1069 fn simulate_interrupted_prune(
1070 num_columns: usize,
1071 file_path: &Path,
1072 num_rows: u64,
1073 missing_offsets: u64,
1074 ) {
1075 let nippy = NippyJar::load_without_header(file_path).unwrap();
1076 let reader = nippy.open_data_reader().unwrap();
1077 let offsets_file =
1078 OpenOptions::new().read(true).write(true).open(nippy.offsets_path()).unwrap();
1079 let offsets_len = 1 + num_rows * num_columns as u64 * 8 + 8;
1080 assert_eq!(offsets_len, offsets_file.metadata().unwrap().len());
1081
1082 let data_file = OpenOptions::new().read(true).write(true).open(nippy.data_path()).unwrap();
1083 let data_len = reader.reverse_offset(0).unwrap();
1084 assert_eq!(data_len, data_file.metadata().unwrap().len());
1085
1086 data_file.set_len(data_len - 32 * missing_offsets).unwrap();
1092
1093 let _ = NippyJarWriter::new(nippy).unwrap();
1095 }
1096}