1#![doc(
8 html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
9 html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
10 issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
11)]
12#![cfg_attr(not(test), warn(unused_crate_dependencies))]
13#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
14
15use memmap2::Mmap;
16use serde::{Deserialize, Serialize};
17use std::{
18 error::Error as StdError,
19 fs::File,
20 io::Read,
21 ops::Range,
22 path::{Path, PathBuf},
23};
24use tracing::*;
25
26pub mod compression;
28#[cfg(test)]
29use compression::Compression;
30use compression::Compressors;
31
32#[derive(Debug, Serialize, Deserialize)]
34#[cfg_attr(test, derive(PartialEq, Eq))]
35pub enum Functions {}
36
37#[derive(Debug, Serialize, Deserialize)]
39#[cfg_attr(test, derive(PartialEq, Eq))]
40pub enum InclusionFilters {}
41
42mod error;
43pub use error::NippyJarError;
44
45mod cursor;
46pub use cursor::NippyJarCursor;
47
48mod writer;
49pub use writer::NippyJarWriter;
50
51mod consistency;
52pub use consistency::NippyJarChecker;
53
54const NIPPY_JAR_VERSION: usize = 1;
56const INDEX_FILE_EXTENSION: &str = "idx";
58const OFFSETS_FILE_EXTENSION: &str = "off";
60pub const CONFIG_FILE_EXTENSION: &str = "conf";
62
63type RefRow<'a> = Vec<&'a [u8]>;
66
67pub type ColumnResult<T> = Result<T, Box<dyn StdError + Send + Sync>>;
69
70pub trait NippyJarHeader:
72 Send + Sync + Serialize + for<'b> Deserialize<'b> + std::fmt::Debug + 'static
73{
74}
75
76impl<T> NippyJarHeader for T where
78 T: Send + Sync + Serialize + for<'b> Deserialize<'b> + std::fmt::Debug + 'static
79{
80}
81
82#[derive(Serialize, Deserialize)]
87#[cfg_attr(test, derive(PartialEq))]
88pub struct NippyJar<H = ()> {
89 version: usize,
91 user_header: H,
94 columns: usize,
96 rows: usize,
98 compressor: Option<Compressors>,
100 #[serde(skip)]
101 filter: Option<InclusionFilters>,
103 #[serde(skip)]
104 phf: Option<Functions>,
106 max_row_size: usize,
109 #[serde(skip)]
111 path: PathBuf,
112}
113
114impl<H: NippyJarHeader> std::fmt::Debug for NippyJar<H> {
115 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116 f.debug_struct("NippyJar")
117 .field("version", &self.version)
118 .field("user_header", &self.user_header)
119 .field("rows", &self.rows)
120 .field("columns", &self.columns)
121 .field("compressor", &self.compressor)
122 .field("filter", &self.filter)
123 .field("phf", &self.phf)
124 .field("path", &self.path)
125 .field("max_row_size", &self.max_row_size)
126 .finish_non_exhaustive()
127 }
128}
129
130impl NippyJar<()> {
131 pub fn new_without_header(columns: usize, path: &Path) -> Self {
133 Self::new(columns, path, ())
134 }
135
136 pub fn load_without_header(path: &Path) -> Result<Self, NippyJarError> {
138 Self::load(path)
139 }
140}
141
142impl<H: NippyJarHeader> NippyJar<H> {
143 pub fn new(columns: usize, path: &Path, user_header: H) -> Self {
145 Self {
146 version: NIPPY_JAR_VERSION,
147 user_header,
148 columns,
149 rows: 0,
150 max_row_size: 0,
151 compressor: None,
152 filter: None,
153 phf: None,
154 path: path.to_path_buf(),
155 }
156 }
157
158 pub fn with_zstd(mut self, use_dict: bool, max_dict_size: usize) -> Self {
160 self.compressor =
161 Some(Compressors::Zstd(compression::Zstd::new(use_dict, max_dict_size, self.columns)));
162 self
163 }
164
165 pub fn with_lz4(mut self) -> Self {
167 self.compressor = Some(Compressors::Lz4(compression::Lz4::default()));
168 self
169 }
170
171 pub const fn user_header(&self) -> &H {
173 &self.user_header
174 }
175
176 pub const fn columns(&self) -> usize {
178 self.columns
179 }
180
181 pub const fn rows(&self) -> usize {
183 self.rows
184 }
185
186 pub const fn compressor(&self) -> Option<&Compressors> {
188 self.compressor.as_ref()
189 }
190
191 pub const fn compressor_mut(&mut self) -> Option<&mut Compressors> {
193 self.compressor.as_mut()
194 }
195
196 pub fn load(path: &Path) -> Result<Self, NippyJarError> {
200 let config_path = path.with_extension(CONFIG_FILE_EXTENSION);
202 let config_file = File::open(&config_path)
203 .map_err(|err| reth_fs_util::FsPathError::open(err, config_path))?;
204
205 let mut obj = Self::load_from_reader(config_file)?;
206 obj.path = path.to_path_buf();
207 Ok(obj)
208 }
209
210 pub fn load_from_reader<R: Read>(reader: R) -> Result<Self, NippyJarError> {
212 Ok(bincode::deserialize_from(reader)?)
213 }
214
215 pub fn data_path(&self) -> &Path {
217 self.path.as_ref()
218 }
219
220 pub fn index_path(&self) -> PathBuf {
222 self.path.with_extension(INDEX_FILE_EXTENSION)
223 }
224
225 pub fn offsets_path(&self) -> PathBuf {
227 self.path.with_extension(OFFSETS_FILE_EXTENSION)
228 }
229
230 pub fn config_path(&self) -> PathBuf {
232 self.path.with_extension(CONFIG_FILE_EXTENSION)
233 }
234
235 pub fn delete(self) -> Result<(), NippyJarError> {
237 for path in
240 [self.data_path().into(), self.index_path(), self.offsets_path(), self.config_path()]
241 {
242 if path.exists() {
243 reth_fs_util::remove_file(path)?;
244 }
245 }
246
247 Ok(())
248 }
249
250 pub fn open_data_reader(&self) -> Result<DataReader, NippyJarError> {
252 DataReader::new(self.data_path())
253 }
254
255 fn freeze_config(&self) -> Result<(), NippyJarError> {
257 Ok(reth_fs_util::atomic_write_file(&self.config_path(), |file| {
258 bincode::serialize_into(file, &self)
259 })?)
260 }
261}
262
263#[cfg(test)]
264impl<H: NippyJarHeader> NippyJar<H> {
265 pub fn prepare_compression(
267 &mut self,
268 columns: Vec<impl IntoIterator<Item = Vec<u8>>>,
269 ) -> Result<(), NippyJarError> {
270 if let Some(compression) = &mut self.compressor {
272 debug!(target: "nippy-jar", columns=columns.len(), "Preparing compression.");
273 compression.prepare_compression(columns)?;
274 }
275 Ok(())
276 }
277
278 pub fn freeze(
280 self,
281 columns: Vec<impl IntoIterator<Item = ColumnResult<Vec<u8>>>>,
282 total_rows: u64,
283 ) -> Result<Self, NippyJarError> {
284 self.check_before_freeze(&columns)?;
285
286 debug!(target: "nippy-jar", path=?self.data_path(), "Opening data file.");
287
288 let mut writer = NippyJarWriter::new(self)?;
290
291 writer.append_rows(columns, total_rows)?;
293
294 writer.commit()?;
296
297 debug!(target: "nippy-jar", ?writer, "Finished writing data.");
298
299 Ok(writer.into_jar())
300 }
301
302 fn check_before_freeze(
304 &self,
305 columns: &[impl IntoIterator<Item = ColumnResult<Vec<u8>>>],
306 ) -> Result<(), NippyJarError> {
307 if columns.len() != self.columns {
308 return Err(NippyJarError::ColumnLenMismatch(self.columns, columns.len()))
309 }
310
311 if let Some(compression) = &self.compressor {
312 if !compression.is_ready() {
313 return Err(NippyJarError::CompressorNotReady)
314 }
315 }
316
317 Ok(())
318 }
319}
320
321#[derive(Debug)]
325pub struct DataReader {
326 #[expect(dead_code)]
328 data_file: File,
329 data_mmap: Mmap,
331 offset_file: File,
333 offset_mmap: Mmap,
335 offset_size: u8,
337}
338
339impl DataReader {
340 pub fn new(path: impl AsRef<Path>) -> Result<Self, NippyJarError> {
342 let data_file = File::open(path.as_ref())?;
343 let data_mmap = unsafe { Mmap::map(&data_file)? };
345
346 let offset_file = File::open(path.as_ref().with_extension(OFFSETS_FILE_EXTENSION))?;
347 let offset_mmap = unsafe { Mmap::map(&offset_file)? };
349
350 let offset_size = offset_mmap[0];
352
353 if offset_size > 8 {
355 return Err(NippyJarError::OffsetSizeTooBig { offset_size })
356 } else if offset_size == 0 {
357 return Err(NippyJarError::OffsetSizeTooSmall { offset_size })
358 }
359
360 Ok(Self { data_file, data_mmap, offset_file, offset_size, offset_mmap })
361 }
362
363 pub fn offset(&self, index: usize) -> Result<u64, NippyJarError> {
365 let from = index * self.offset_size as usize + 1;
367
368 self.offset_at(from)
369 }
370
371 pub fn reverse_offset(&self, index: usize) -> Result<u64, NippyJarError> {
373 let offsets_file_size = self.offset_file.metadata()?.len() as usize;
374
375 if offsets_file_size > 1 {
376 let from = offsets_file_size - self.offset_size as usize * (index + 1);
377
378 self.offset_at(from)
379 } else {
380 Ok(0)
381 }
382 }
383
384 pub fn offsets_count(&self) -> Result<usize, NippyJarError> {
387 Ok((self.offset_file.metadata()?.len().saturating_sub(1) / self.offset_size as u64)
388 as usize)
389 }
390
391 fn offset_at(&self, index: usize) -> Result<u64, NippyJarError> {
393 let mut buffer: [u8; 8] = [0; 8];
394
395 let offset_end = index.saturating_add(self.offset_size as usize);
396 if offset_end > self.offset_mmap.len() {
397 return Err(NippyJarError::OffsetOutOfBounds { index })
398 }
399
400 buffer[..self.offset_size as usize].copy_from_slice(&self.offset_mmap[index..offset_end]);
401 Ok(u64::from_le_bytes(buffer))
402 }
403
404 pub const fn offset_size(&self) -> u8 {
406 self.offset_size
407 }
408
409 pub fn data(&self, range: Range<usize>) -> &[u8] {
411 &self.data_mmap[range]
412 }
413
414 pub fn size(&self) -> usize {
416 self.data_mmap.len()
417 }
418}
419
420#[cfg(test)]
421mod tests {
422 use super::*;
423 use compression::Compression;
424 use rand::{rngs::SmallRng, seq::SliceRandom, RngCore, SeedableRng};
425 use std::{fs::OpenOptions, io::Read};
426
427 type ColumnResults<T> = Vec<ColumnResult<T>>;
428 type ColumnValues = Vec<Vec<u8>>;
429
430 fn test_data(seed: Option<u64>) -> (ColumnValues, ColumnValues) {
431 let value_length = 32;
432 let num_rows = 100;
433
434 let mut vec: Vec<u8> = vec![0; value_length];
435 let mut rng = seed.map(SmallRng::seed_from_u64).unwrap_or_else(SmallRng::from_os_rng);
436
437 let mut entry_gen = || {
438 (0..num_rows)
439 .map(|_| {
440 rng.fill_bytes(&mut vec[..]);
441 vec.clone()
442 })
443 .collect()
444 };
445
446 (entry_gen(), entry_gen())
447 }
448
449 fn clone_with_result(col: &ColumnValues) -> ColumnResults<Vec<u8>> {
450 col.iter().map(|v| Ok(v.clone())).collect()
451 }
452
453 #[test]
454 fn test_config_serialization() {
455 let file = tempfile::NamedTempFile::new().unwrap();
456 let jar = NippyJar::new_without_header(23, file.path()).with_lz4();
457 jar.freeze_config().unwrap();
458
459 let mut config_file = OpenOptions::new().read(true).open(jar.config_path()).unwrap();
460 let config_file_len = config_file.metadata().unwrap().len();
461 assert_eq!(config_file_len, 37);
462
463 let mut buf = Vec::with_capacity(config_file_len as usize);
464 config_file.read_to_end(&mut buf).unwrap();
465
466 assert_eq!(
467 vec![
468 1, 0, 0, 0, 0, 0, 0, 0, 23, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0,
469 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
470 ],
471 buf
472 );
473
474 let mut read_jar = bincode::deserialize_from::<_, NippyJar>(&buf[..]).unwrap();
475 read_jar.path = file.path().to_path_buf();
477 assert_eq!(jar, read_jar);
478 }
479
480 #[test]
481 fn test_zstd_with_dictionaries() {
482 let (col1, col2) = test_data(None);
483 let num_rows = col1.len() as u64;
484 let num_columns = 2;
485 let file_path = tempfile::NamedTempFile::new().unwrap();
486
487 let nippy = NippyJar::new_without_header(num_columns, file_path.path());
488 assert!(nippy.compressor().is_none());
489
490 let mut nippy =
491 NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(true, 5000);
492 assert!(nippy.compressor().is_some());
493
494 if let Some(Compressors::Zstd(zstd)) = &mut nippy.compressor_mut() {
495 assert!(matches!(zstd.compressors(), Err(NippyJarError::CompressorNotReady)));
496
497 assert!(matches!(
499 zstd.prepare_compression(vec![col1.clone(), col2.clone(), col2.clone()]),
500 Err(NippyJarError::ColumnLenMismatch(columns, 3)) if columns == num_columns
501 ));
502 }
503
504 assert!(matches!(
507 nippy.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows),
508 Err(NippyJarError::CompressorNotReady)
509 ));
510
511 let mut nippy =
512 NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(true, 5000);
513 assert!(nippy.compressor().is_some());
514
515 nippy.prepare_compression(vec![col1.clone(), col2.clone()]).unwrap();
516
517 if let Some(Compressors::Zstd(zstd)) = &nippy.compressor() {
518 assert!(matches!(
519 (&zstd.state, zstd.dictionaries.as_ref().map(|dict| dict.len())),
520 (compression::ZstdState::Ready, Some(columns)) if columns == num_columns
521 ));
522 }
523
524 let nippy = nippy
525 .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
526 .unwrap();
527
528 let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
529 assert_eq!(nippy.version, loaded_nippy.version);
530 assert_eq!(nippy.columns, loaded_nippy.columns);
531 assert_eq!(nippy.filter, loaded_nippy.filter);
532 assert_eq!(nippy.phf, loaded_nippy.phf);
533 assert_eq!(nippy.max_row_size, loaded_nippy.max_row_size);
534 assert_eq!(nippy.path, loaded_nippy.path);
535
536 if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() {
537 assert!(zstd.use_dict);
538 let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
539
540 let mut row_index = 0usize;
542 while let Some(row) = cursor.next_row().unwrap() {
543 assert_eq!(
544 (row[0], row[1]),
545 (col1[row_index].as_slice(), col2[row_index].as_slice())
546 );
547 row_index += 1;
548 }
549 } else {
550 panic!("Expected Zstd compressor")
551 }
552 }
553
554 #[test]
555 fn test_lz4() {
556 let (col1, col2) = test_data(None);
557 let num_rows = col1.len() as u64;
558 let num_columns = 2;
559 let file_path = tempfile::NamedTempFile::new().unwrap();
560
561 let nippy = NippyJar::new_without_header(num_columns, file_path.path());
562 assert!(nippy.compressor().is_none());
563
564 let nippy = NippyJar::new_without_header(num_columns, file_path.path()).with_lz4();
565 assert!(nippy.compressor().is_some());
566
567 let nippy = nippy
568 .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
569 .unwrap();
570
571 let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
572 assert_eq!(nippy, loaded_nippy);
573
574 if let Some(Compressors::Lz4(_)) = loaded_nippy.compressor() {
575 let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
576
577 let mut row_index = 0usize;
579 while let Some(row) = cursor.next_row().unwrap() {
580 assert_eq!(
581 (row[0], row[1]),
582 (col1[row_index].as_slice(), col2[row_index].as_slice())
583 );
584 row_index += 1;
585 }
586 } else {
587 panic!("Expected Lz4 compressor")
588 }
589 }
590
591 #[test]
592 fn test_zstd_no_dictionaries() {
593 let (col1, col2) = test_data(None);
594 let num_rows = col1.len() as u64;
595 let num_columns = 2;
596 let file_path = tempfile::NamedTempFile::new().unwrap();
597
598 let nippy = NippyJar::new_without_header(num_columns, file_path.path());
599 assert!(nippy.compressor().is_none());
600
601 let nippy =
602 NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(false, 5000);
603 assert!(nippy.compressor().is_some());
604
605 let nippy = nippy
606 .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
607 .unwrap();
608
609 let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
610 assert_eq!(nippy, loaded_nippy);
611
612 if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() {
613 assert!(!zstd.use_dict);
614
615 let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
616
617 let mut row_index = 0usize;
619 while let Some(row) = cursor.next_row().unwrap() {
620 assert_eq!(
621 (row[0], row[1]),
622 (col1[row_index].as_slice(), col2[row_index].as_slice())
623 );
624 row_index += 1;
625 }
626 } else {
627 panic!("Expected Zstd compressor")
628 }
629 }
630
631 #[test]
633 fn test_full_nippy_jar() {
634 let (col1, col2) = test_data(None);
635 let num_rows = col1.len() as u64;
636 let num_columns = 2;
637 let file_path = tempfile::NamedTempFile::new().unwrap();
638 let data = vec![col1.clone(), col2.clone()];
639
640 let block_start = 500;
641
642 #[derive(Serialize, Deserialize, Debug)]
643 struct BlockJarHeader {
644 block_start: usize,
645 }
646
647 {
649 let mut nippy =
650 NippyJar::new(num_columns, file_path.path(), BlockJarHeader { block_start })
651 .with_zstd(true, 5000);
652
653 nippy.prepare_compression(data.clone()).unwrap();
654 nippy
655 .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
656 .unwrap();
657 }
658
659 {
661 let loaded_nippy = NippyJar::<BlockJarHeader>::load(file_path.path()).unwrap();
662
663 assert!(loaded_nippy.compressor().is_some());
664 assert_eq!(loaded_nippy.user_header().block_start, block_start);
665
666 if let Some(Compressors::Zstd(_zstd)) = loaded_nippy.compressor() {
667 let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
668
669 let mut row_num = 0usize;
671 while let Some(row) = cursor.next_row().unwrap() {
672 assert_eq!(
673 (row[0], row[1]),
674 (data[0][row_num].as_slice(), data[1][row_num].as_slice())
675 );
676 row_num += 1;
677 }
678
679 let mut data = col1.iter().zip(col2.iter()).enumerate().collect::<Vec<_>>();
681 data.shuffle(&mut rand::rng());
682
683 for (row_num, (v0, v1)) in data {
684 let row_by_num = cursor.row_by_number(row_num).unwrap().unwrap();
686 assert_eq!((&row_by_num[0].to_vec(), &row_by_num[1].to_vec()), (v0, v1));
687 }
688 }
689 }
690 }
691
692 #[test]
693 fn test_selectable_column_values() {
694 let (col1, col2) = test_data(None);
695 let num_rows = col1.len() as u64;
696 let num_columns = 2;
697 let file_path = tempfile::NamedTempFile::new().unwrap();
698 let data = vec![col1.clone(), col2.clone()];
699
700 {
702 let mut nippy =
703 NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(true, 5000);
704 nippy.prepare_compression(data).unwrap();
705 nippy
706 .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
707 .unwrap();
708 }
709
710 {
712 let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
713
714 if let Some(Compressors::Zstd(_zstd)) = loaded_nippy.compressor() {
715 let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
716
717 let mut data = col1.iter().zip(col2.iter()).enumerate().collect::<Vec<_>>();
719 data.shuffle(&mut rand::rng());
720
721 const BLOCKS_FULL_MASK: usize = 0b11;
723
724 for (row_num, (v0, v1)) in &data {
726 let row_by_num = cursor
728 .row_by_number_with_cols(*row_num, BLOCKS_FULL_MASK)
729 .unwrap()
730 .unwrap();
731 assert_eq!((&row_by_num[0].to_vec(), &row_by_num[1].to_vec()), (*v0, *v1));
732 }
733
734 const BLOCKS_BLOCK_MASK: usize = 0b01;
736 for (row_num, (v0, _)) in &data {
737 let row_by_num = cursor
739 .row_by_number_with_cols(*row_num, BLOCKS_BLOCK_MASK)
740 .unwrap()
741 .unwrap();
742 assert_eq!(row_by_num.len(), 1);
743 assert_eq!(&row_by_num[0].to_vec(), *v0);
744 }
745
746 const BLOCKS_WITHDRAWAL_MASK: usize = 0b10;
748 for (row_num, (_, v1)) in &data {
749 let row_by_num = cursor
751 .row_by_number_with_cols(*row_num, BLOCKS_WITHDRAWAL_MASK)
752 .unwrap()
753 .unwrap();
754 assert_eq!(row_by_num.len(), 1);
755 assert_eq!(&row_by_num[0].to_vec(), *v1);
756 }
757
758 const BLOCKS_EMPTY_MASK: usize = 0b00;
760 for (row_num, _) in &data {
761 assert!(cursor
763 .row_by_number_with_cols(*row_num, BLOCKS_EMPTY_MASK)
764 .unwrap()
765 .unwrap()
766 .is_empty());
767 }
768 }
769 }
770 }
771
772 #[test]
773 fn test_writer() {
774 let (col1, col2) = test_data(None);
775 let num_columns = 2;
776 let file_path = tempfile::NamedTempFile::new().unwrap();
777
778 append_two_rows(num_columns, file_path.path(), &col1, &col2);
779
780 prune_rows(num_columns, file_path.path(), &col1, &col2);
783
784 append_two_rows(num_columns, file_path.path(), &col1, &col2);
786
787 test_append_consistency_no_commit(file_path.path(), &col1, &col2);
790
791 test_append_consistency_partial_commit(file_path.path(), &col1, &col2);
793 }
794
795 #[test]
796 fn test_pruner() {
797 let (col1, col2) = test_data(None);
798 let num_columns = 2;
799 let num_rows = 2;
800
801 let missing_offsets_scenarios = [(1, 1), (2, 1), (3, 0)];
804
805 for (missing_offsets, expected_rows) in missing_offsets_scenarios {
806 let file_path = tempfile::NamedTempFile::new().unwrap();
807
808 append_two_rows(num_columns, file_path.path(), &col1, &col2);
809
810 simulate_interrupted_prune(num_columns, file_path.path(), num_rows, missing_offsets);
811
812 let nippy = NippyJar::load_without_header(file_path.path()).unwrap();
813 assert_eq!(nippy.rows, expected_rows);
814 }
815 }
816
817 fn test_append_consistency_partial_commit(
818 file_path: &Path,
819 col1: &[Vec<u8>],
820 col2: &[Vec<u8>],
821 ) {
822 let nippy = NippyJar::load_without_header(file_path).unwrap();
823
824 let initial_rows = nippy.rows;
826 let initial_data_size =
827 File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
828 let initial_offset_size =
829 File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize;
830 assert!(initial_data_size > 0);
831 assert!(initial_offset_size > 0);
832
833 let mut writer = NippyJarWriter::new(nippy).unwrap();
835 writer.append_column(Some(Ok(&col1[2]))).unwrap();
836 writer.append_column(Some(Ok(&col2[2]))).unwrap();
837
838 let _ = writer.offsets_mut().pop();
840
841 writer.commit_offsets().unwrap();
844
845 drop(writer);
847
848 let nippy = NippyJar::load_without_header(file_path).unwrap();
849 assert_eq!(initial_rows, nippy.rows);
850
851 let new_data_size =
853 File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
854 assert_eq!(new_data_size, initial_data_size + col1[2].len() + col2[2].len());
855
856 assert_eq!(
858 initial_offset_size + 8,
859 File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize
860 );
861
862 let writer = NippyJarWriter::new(nippy).unwrap();
866 assert_eq!(initial_rows, writer.rows());
867 assert_eq!(
868 initial_offset_size,
869 File::open(writer.offsets_path()).unwrap().metadata().unwrap().len() as usize
870 );
871 assert_eq!(
872 initial_data_size,
873 File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize
874 );
875 }
876
877 fn test_append_consistency_no_commit(file_path: &Path, col1: &[Vec<u8>], col2: &[Vec<u8>]) {
878 let nippy = NippyJar::load_without_header(file_path).unwrap();
879
880 let initial_rows = nippy.rows;
882 let initial_data_size =
883 File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
884 let initial_offset_size =
885 File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize;
886 assert!(initial_data_size > 0);
887 assert!(initial_offset_size > 0);
888
889 let mut writer = NippyJarWriter::new(nippy).unwrap();
892 writer.append_column(Some(Ok(&col1[2]))).unwrap();
893 writer.append_column(Some(Ok(&col2[2]))).unwrap();
894
895 drop(writer);
897
898 let nippy = NippyJar::load_without_header(file_path).unwrap();
899 assert_eq!(initial_rows, nippy.rows);
900
901 let new_data_size =
903 File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
904 assert_eq!(new_data_size, initial_data_size + col1[2].len() + col2[2].len());
905
906 assert_eq!(
908 initial_offset_size,
909 File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize
910 );
911
912 let writer = NippyJarWriter::new(nippy).unwrap();
915 assert_eq!(initial_rows, writer.rows());
916 assert_eq!(
917 initial_data_size,
918 File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize
919 );
920 }
921
922 fn append_two_rows(num_columns: usize, file_path: &Path, col1: &[Vec<u8>], col2: &[Vec<u8>]) {
923 {
925 let nippy = NippyJar::new_without_header(num_columns, file_path);
926 nippy.freeze_config().unwrap();
927 assert_eq!(nippy.max_row_size, 0);
928 assert_eq!(nippy.rows, 0);
929
930 let mut writer = NippyJarWriter::new(nippy).unwrap();
931 assert_eq!(writer.column(), 0);
932
933 writer.append_column(Some(Ok(&col1[0]))).unwrap();
934 assert_eq!(writer.column(), 1);
935 assert!(writer.is_dirty());
936
937 writer.append_column(Some(Ok(&col2[0]))).unwrap();
938 assert!(writer.is_dirty());
939
940 assert_eq!(writer.column(), 0);
942
943 assert_eq!(writer.offsets().len(), 3);
945 let expected_data_file_size = *writer.offsets().last().unwrap();
946 writer.commit().unwrap();
947 assert!(!writer.is_dirty());
948
949 assert_eq!(writer.max_row_size(), col1[0].len() + col2[0].len());
950 assert_eq!(writer.rows(), 1);
951 assert_eq!(
952 File::open(writer.offsets_path()).unwrap().metadata().unwrap().len(),
953 1 + num_columns as u64 * 8 + 8
954 );
955 assert_eq!(
956 File::open(writer.data_path()).unwrap().metadata().unwrap().len(),
957 expected_data_file_size
958 );
959 }
960
961 {
963 let nippy = NippyJar::load_without_header(file_path).unwrap();
964 assert_eq!(nippy.max_row_size, col1[0].len() + col2[0].len());
966 assert_eq!(nippy.rows, 1);
967
968 let mut writer = NippyJarWriter::new(nippy).unwrap();
969 assert_eq!(writer.column(), 0);
970
971 writer.append_column(Some(Ok(&col1[1]))).unwrap();
972 assert_eq!(writer.column(), 1);
973
974 writer.append_column(Some(Ok(&col2[1]))).unwrap();
975
976 assert_eq!(writer.column(), 0);
978
979 assert_eq!(writer.offsets().len(), 3);
981 let expected_data_file_size = *writer.offsets().last().unwrap();
982 writer.commit().unwrap();
983
984 assert_eq!(writer.max_row_size(), col1[0].len() + col2[0].len());
985 assert_eq!(writer.rows(), 2);
986 assert_eq!(
987 File::open(writer.offsets_path()).unwrap().metadata().unwrap().len(),
988 1 + writer.rows() as u64 * num_columns as u64 * 8 + 8
989 );
990 assert_eq!(
991 File::open(writer.data_path()).unwrap().metadata().unwrap().len(),
992 expected_data_file_size
993 );
994 }
995 }
996
997 fn prune_rows(num_columns: usize, file_path: &Path, col1: &[Vec<u8>], col2: &[Vec<u8>]) {
998 let nippy = NippyJar::load_without_header(file_path).unwrap();
999 let mut writer = NippyJarWriter::new(nippy).unwrap();
1000
1001 writer.append_column(Some(Ok(&col1[2]))).unwrap();
1003 writer.append_column(Some(Ok(&col2[2]))).unwrap();
1004 assert!(writer.is_dirty());
1005
1006 writer.prune_rows(2).unwrap();
1008 assert_eq!(writer.rows(), 1);
1009
1010 assert_eq!(
1011 File::open(writer.offsets_path()).unwrap().metadata().unwrap().len(),
1012 1 + writer.rows() as u64 * num_columns as u64 * 8 + 8
1013 );
1014
1015 let expected_data_size = col1[0].len() + col2[0].len();
1016 assert_eq!(
1017 File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize,
1018 expected_data_size
1019 );
1020
1021 let nippy = NippyJar::load_without_header(file_path).unwrap();
1022 {
1023 let data_reader = nippy.open_data_reader().unwrap();
1024 assert_eq!(data_reader.offset(2).unwrap(), expected_data_size as u64);
1027 }
1028
1029 let mut writer = NippyJarWriter::new(nippy).unwrap();
1031 writer.prune_rows(1).unwrap();
1032 assert!(writer.is_dirty());
1033
1034 assert_eq!(writer.rows(), 0);
1035 assert_eq!(writer.max_row_size(), 0);
1036 assert_eq!(File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize, 0);
1037 assert_eq!(
1039 File::open(writer.offsets_path()).unwrap().metadata().unwrap().len() as usize,
1040 1
1041 );
1042 writer.commit().unwrap();
1043 assert!(!writer.is_dirty());
1044 }
1045
1046 fn simulate_interrupted_prune(
1047 num_columns: usize,
1048 file_path: &Path,
1049 num_rows: u64,
1050 missing_offsets: u64,
1051 ) {
1052 let nippy = NippyJar::load_without_header(file_path).unwrap();
1053 let reader = nippy.open_data_reader().unwrap();
1054 let offsets_file =
1055 OpenOptions::new().read(true).write(true).open(nippy.offsets_path()).unwrap();
1056 let offsets_len = 1 + num_rows * num_columns as u64 * 8 + 8;
1057 assert_eq!(offsets_len, offsets_file.metadata().unwrap().len());
1058
1059 let data_file = OpenOptions::new().read(true).write(true).open(nippy.data_path()).unwrap();
1060 let data_len = reader.reverse_offset(0).unwrap();
1061 assert_eq!(data_len, data_file.metadata().unwrap().len());
1062
1063 data_file.set_len(data_len - 32 * missing_offsets).unwrap();
1069
1070 let _ = NippyJarWriter::new(nippy).unwrap();
1072 }
1073}