1#![doc(
8 html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
9 html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
10 issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
11)]
12#![cfg_attr(not(test), warn(unused_crate_dependencies))]
13#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
14
15use memmap2::Mmap;
16use serde::{Deserialize, Serialize};
17use std::{
18 error::Error as StdError,
19 fs::File,
20 io::Read,
21 ops::Range,
22 path::{Path, PathBuf},
23};
24use tracing::*;
25
26pub mod compression;
28#[cfg(test)]
29use compression::Compression;
30use compression::Compressors;
31
32#[derive(Debug, Serialize, Deserialize)]
34#[cfg_attr(test, derive(PartialEq, Eq))]
35pub enum Functions {}
36
37#[derive(Debug, Serialize, Deserialize)]
39#[cfg_attr(test, derive(PartialEq, Eq))]
40pub enum InclusionFilters {}
41
42mod error;
43pub use error::NippyJarError;
44
45mod cursor;
46pub use cursor::NippyJarCursor;
47
48mod writer;
49pub use writer::NippyJarWriter;
50
51mod consistency;
52pub use consistency::NippyJarChecker;
53
54const NIPPY_JAR_VERSION: usize = 1;
56const INDEX_FILE_EXTENSION: &str = "idx";
58const OFFSETS_FILE_EXTENSION: &str = "off";
60pub const CONFIG_FILE_EXTENSION: &str = "conf";
62
63type RefRow<'a> = Vec<&'a [u8]>;
66
67pub type ColumnResult<T> = Result<T, Box<dyn StdError + Send + Sync>>;
69
70pub trait NippyJarHeader:
72 Send + Sync + Serialize + for<'b> Deserialize<'b> + std::fmt::Debug + 'static
73{
74}
75
76impl<T> NippyJarHeader for T where
78 T: Send + Sync + Serialize + for<'b> Deserialize<'b> + std::fmt::Debug + 'static
79{
80}
81
82#[derive(Serialize, Deserialize)]
87#[cfg_attr(test, derive(PartialEq))]
88pub struct NippyJar<H = ()> {
89 version: usize,
91 user_header: H,
94 columns: usize,
96 rows: usize,
98 compressor: Option<Compressors>,
100 #[serde(skip)]
101 filter: Option<InclusionFilters>,
103 #[serde(skip)]
104 phf: Option<Functions>,
106 max_row_size: usize,
109 #[serde(skip)]
111 path: PathBuf,
112}
113
114impl<H: NippyJarHeader> std::fmt::Debug for NippyJar<H> {
115 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116 f.debug_struct("NippyJar")
117 .field("version", &self.version)
118 .field("user_header", &self.user_header)
119 .field("rows", &self.rows)
120 .field("columns", &self.columns)
121 .field("compressor", &self.compressor)
122 .field("filter", &self.filter)
123 .field("phf", &self.phf)
124 .field("path", &self.path)
125 .field("max_row_size", &self.max_row_size)
126 .finish_non_exhaustive()
127 }
128}
129
130impl NippyJar<()> {
131 pub fn new_without_header(columns: usize, path: &Path) -> Self {
133 Self::new(columns, path, ())
134 }
135
136 pub fn load_without_header(path: &Path) -> Result<Self, NippyJarError> {
138 Self::load(path)
139 }
140}
141
142impl<H: NippyJarHeader> NippyJar<H> {
143 pub fn new(columns: usize, path: &Path, user_header: H) -> Self {
145 Self {
146 version: NIPPY_JAR_VERSION,
147 user_header,
148 columns,
149 rows: 0,
150 max_row_size: 0,
151 compressor: None,
152 filter: None,
153 phf: None,
154 path: path.to_path_buf(),
155 }
156 }
157
158 pub fn with_zstd(mut self, use_dict: bool, max_dict_size: usize) -> Self {
160 self.compressor =
161 Some(Compressors::Zstd(compression::Zstd::new(use_dict, max_dict_size, self.columns)));
162 self
163 }
164
165 pub fn with_lz4(mut self) -> Self {
167 self.compressor = Some(Compressors::Lz4(compression::Lz4::default()));
168 self
169 }
170
171 pub const fn user_header(&self) -> &H {
173 &self.user_header
174 }
175
176 pub const fn columns(&self) -> usize {
178 self.columns
179 }
180
181 pub const fn rows(&self) -> usize {
183 self.rows
184 }
185
186 pub const fn compressor(&self) -> Option<&Compressors> {
188 self.compressor.as_ref()
189 }
190
191 pub const fn compressor_mut(&mut self) -> Option<&mut Compressors> {
193 self.compressor.as_mut()
194 }
195
196 pub fn load(path: &Path) -> Result<Self, NippyJarError> {
200 let config_path = path.with_extension(CONFIG_FILE_EXTENSION);
202 let config_file = File::open(&config_path)
203 .map_err(|err| reth_fs_util::FsPathError::open(err, config_path))?;
204
205 let mut obj = Self::load_from_reader(config_file)?;
206 obj.path = path.to_path_buf();
207 Ok(obj)
208 }
209
210 pub fn load_from_reader<R: Read>(reader: R) -> Result<Self, NippyJarError> {
212 Ok(bincode::deserialize_from(reader)?)
213 }
214
215 pub fn data_path(&self) -> &Path {
217 self.path.as_ref()
218 }
219
220 pub fn index_path(&self) -> PathBuf {
222 self.path.with_extension(INDEX_FILE_EXTENSION)
223 }
224
225 pub fn offsets_path(&self) -> PathBuf {
227 self.path.with_extension(OFFSETS_FILE_EXTENSION)
228 }
229
230 pub fn config_path(&self) -> PathBuf {
232 self.path.with_extension(CONFIG_FILE_EXTENSION)
233 }
234
235 pub fn delete(self) -> Result<(), NippyJarError> {
237 for path in
240 [self.data_path().into(), self.index_path(), self.offsets_path(), self.config_path()]
241 {
242 if path.exists() {
243 debug!(target: "nippy-jar", ?path, "Removing file.");
244 reth_fs_util::remove_file(path)?;
245 }
246 }
247
248 Ok(())
249 }
250
251 pub fn open_data_reader(&self) -> Result<DataReader, NippyJarError> {
253 DataReader::new(self.data_path())
254 }
255
256 fn freeze_config(&self) -> Result<(), NippyJarError> {
258 Ok(reth_fs_util::atomic_write_file(&self.config_path(), |file| {
259 bincode::serialize_into(file, &self)
260 })?)
261 }
262}
263
264#[cfg(test)]
265impl<H: NippyJarHeader> NippyJar<H> {
266 pub fn prepare_compression(
268 &mut self,
269 columns: Vec<impl IntoIterator<Item = Vec<u8>>>,
270 ) -> Result<(), NippyJarError> {
271 if let Some(compression) = &mut self.compressor {
273 debug!(target: "nippy-jar", columns=columns.len(), "Preparing compression.");
274 compression.prepare_compression(columns)?;
275 }
276 Ok(())
277 }
278
279 pub fn freeze(
281 self,
282 columns: Vec<impl IntoIterator<Item = ColumnResult<Vec<u8>>>>,
283 total_rows: u64,
284 ) -> Result<Self, NippyJarError> {
285 self.check_before_freeze(&columns)?;
286
287 debug!(target: "nippy-jar", path=?self.data_path(), "Opening data file.");
288
289 let mut writer = NippyJarWriter::new(self)?;
291
292 writer.append_rows(columns, total_rows)?;
294
295 writer.commit()?;
297
298 debug!(target: "nippy-jar", ?writer, "Finished writing data.");
299
300 Ok(writer.into_jar())
301 }
302
303 fn check_before_freeze(
305 &self,
306 columns: &[impl IntoIterator<Item = ColumnResult<Vec<u8>>>],
307 ) -> Result<(), NippyJarError> {
308 if columns.len() != self.columns {
309 return Err(NippyJarError::ColumnLenMismatch(self.columns, columns.len()))
310 }
311
312 if let Some(compression) = &self.compressor {
313 if !compression.is_ready() {
314 return Err(NippyJarError::CompressorNotReady)
315 }
316 }
317
318 Ok(())
319 }
320}
321
322#[derive(Debug)]
326pub struct DataReader {
327 #[expect(dead_code)]
329 data_file: File,
330 data_mmap: Mmap,
332 offset_file: File,
334 offset_mmap: Mmap,
336 offset_size: u8,
338}
339
340impl DataReader {
341 pub fn new(path: impl AsRef<Path>) -> Result<Self, NippyJarError> {
343 let data_file = File::open(path.as_ref())?;
344 let data_mmap = unsafe { Mmap::map(&data_file)? };
346
347 let offset_file = File::open(path.as_ref().with_extension(OFFSETS_FILE_EXTENSION))?;
348 let offset_mmap = unsafe { Mmap::map(&offset_file)? };
350
351 let offset_size = offset_mmap[0];
353
354 if offset_size > 8 {
356 return Err(NippyJarError::OffsetSizeTooBig { offset_size })
357 } else if offset_size == 0 {
358 return Err(NippyJarError::OffsetSizeTooSmall { offset_size })
359 }
360
361 Ok(Self { data_file, data_mmap, offset_file, offset_size, offset_mmap })
362 }
363
364 pub fn offset(&self, index: usize) -> Result<u64, NippyJarError> {
366 let from = index * self.offset_size as usize + 1;
368
369 self.offset_at(from)
370 }
371
372 pub fn reverse_offset(&self, index: usize) -> Result<u64, NippyJarError> {
374 let offsets_file_size = self.offset_file.metadata()?.len() as usize;
375
376 if offsets_file_size > 1 {
377 let from = offsets_file_size - self.offset_size as usize * (index + 1);
378
379 self.offset_at(from)
380 } else {
381 Ok(0)
382 }
383 }
384
385 pub fn offsets_count(&self) -> Result<usize, NippyJarError> {
388 Ok((self.offset_file.metadata()?.len().saturating_sub(1) / self.offset_size as u64)
389 as usize)
390 }
391
392 fn offset_at(&self, index: usize) -> Result<u64, NippyJarError> {
394 let mut buffer: [u8; 8] = [0; 8];
395
396 let offset_end = index.saturating_add(self.offset_size as usize);
397 if offset_end > self.offset_mmap.len() {
398 return Err(NippyJarError::OffsetOutOfBounds { index })
399 }
400
401 buffer[..self.offset_size as usize].copy_from_slice(&self.offset_mmap[index..offset_end]);
402 Ok(u64::from_le_bytes(buffer))
403 }
404
405 pub const fn offset_size(&self) -> u8 {
407 self.offset_size
408 }
409
410 pub fn data(&self, range: Range<usize>) -> &[u8] {
412 &self.data_mmap[range]
413 }
414
415 pub fn size(&self) -> usize {
417 self.data_mmap.len()
418 }
419}
420
421#[cfg(test)]
422mod tests {
423 use super::*;
424 use compression::Compression;
425 use rand::{rngs::SmallRng, seq::SliceRandom, RngCore, SeedableRng};
426 use std::{fs::OpenOptions, io::Read};
427
428 type ColumnResults<T> = Vec<ColumnResult<T>>;
429 type ColumnValues = Vec<Vec<u8>>;
430
431 fn test_data(seed: Option<u64>) -> (ColumnValues, ColumnValues) {
432 let value_length = 32;
433 let num_rows = 100;
434
435 let mut vec: Vec<u8> = vec![0; value_length];
436 let mut rng = seed.map(SmallRng::seed_from_u64).unwrap_or_else(SmallRng::from_os_rng);
437
438 let mut entry_gen = || {
439 (0..num_rows)
440 .map(|_| {
441 rng.fill_bytes(&mut vec[..]);
442 vec.clone()
443 })
444 .collect()
445 };
446
447 (entry_gen(), entry_gen())
448 }
449
450 fn clone_with_result(col: &ColumnValues) -> ColumnResults<Vec<u8>> {
451 col.iter().map(|v| Ok(v.clone())).collect()
452 }
453
454 #[test]
455 fn test_config_serialization() {
456 let file = tempfile::NamedTempFile::new().unwrap();
457 let jar = NippyJar::new_without_header(23, file.path()).with_lz4();
458 jar.freeze_config().unwrap();
459
460 let mut config_file = OpenOptions::new().read(true).open(jar.config_path()).unwrap();
461 let config_file_len = config_file.metadata().unwrap().len();
462 assert_eq!(config_file_len, 37);
463
464 let mut buf = Vec::with_capacity(config_file_len as usize);
465 config_file.read_to_end(&mut buf).unwrap();
466
467 assert_eq!(
468 vec![
469 1, 0, 0, 0, 0, 0, 0, 0, 23, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0,
470 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
471 ],
472 buf
473 );
474
475 let mut read_jar = bincode::deserialize_from::<_, NippyJar>(&buf[..]).unwrap();
476 read_jar.path = file.path().to_path_buf();
478 assert_eq!(jar, read_jar);
479 }
480
481 #[test]
482 fn test_zstd_with_dictionaries() {
483 let (col1, col2) = test_data(None);
484 let num_rows = col1.len() as u64;
485 let num_columns = 2;
486 let file_path = tempfile::NamedTempFile::new().unwrap();
487
488 let nippy = NippyJar::new_without_header(num_columns, file_path.path());
489 assert!(nippy.compressor().is_none());
490
491 let mut nippy =
492 NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(true, 5000);
493 assert!(nippy.compressor().is_some());
494
495 if let Some(Compressors::Zstd(zstd)) = &mut nippy.compressor_mut() {
496 assert!(matches!(zstd.compressors(), Err(NippyJarError::CompressorNotReady)));
497
498 assert!(matches!(
500 zstd.prepare_compression(vec![col1.clone(), col2.clone(), col2.clone()]),
501 Err(NippyJarError::ColumnLenMismatch(columns, 3)) if columns == num_columns
502 ));
503 }
504
505 assert!(matches!(
508 nippy.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows),
509 Err(NippyJarError::CompressorNotReady)
510 ));
511
512 let mut nippy =
513 NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(true, 5000);
514 assert!(nippy.compressor().is_some());
515
516 nippy.prepare_compression(vec![col1.clone(), col2.clone()]).unwrap();
517
518 if let Some(Compressors::Zstd(zstd)) = &nippy.compressor() {
519 assert!(matches!(
520 (&zstd.state, zstd.dictionaries.as_ref().map(|dict| dict.len())),
521 (compression::ZstdState::Ready, Some(columns)) if columns == num_columns
522 ));
523 }
524
525 let nippy = nippy
526 .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
527 .unwrap();
528
529 let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
530 assert_eq!(nippy.version, loaded_nippy.version);
531 assert_eq!(nippy.columns, loaded_nippy.columns);
532 assert_eq!(nippy.filter, loaded_nippy.filter);
533 assert_eq!(nippy.phf, loaded_nippy.phf);
534 assert_eq!(nippy.max_row_size, loaded_nippy.max_row_size);
535 assert_eq!(nippy.path, loaded_nippy.path);
536
537 if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() {
538 assert!(zstd.use_dict);
539 let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
540
541 let mut row_index = 0usize;
543 while let Some(row) = cursor.next_row().unwrap() {
544 assert_eq!(
545 (row[0], row[1]),
546 (col1[row_index].as_slice(), col2[row_index].as_slice())
547 );
548 row_index += 1;
549 }
550 } else {
551 panic!("Expected Zstd compressor")
552 }
553 }
554
555 #[test]
556 fn test_lz4() {
557 let (col1, col2) = test_data(None);
558 let num_rows = col1.len() as u64;
559 let num_columns = 2;
560 let file_path = tempfile::NamedTempFile::new().unwrap();
561
562 let nippy = NippyJar::new_without_header(num_columns, file_path.path());
563 assert!(nippy.compressor().is_none());
564
565 let nippy = NippyJar::new_without_header(num_columns, file_path.path()).with_lz4();
566 assert!(nippy.compressor().is_some());
567
568 let nippy = nippy
569 .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
570 .unwrap();
571
572 let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
573 assert_eq!(nippy, loaded_nippy);
574
575 if let Some(Compressors::Lz4(_)) = loaded_nippy.compressor() {
576 let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
577
578 let mut row_index = 0usize;
580 while let Some(row) = cursor.next_row().unwrap() {
581 assert_eq!(
582 (row[0], row[1]),
583 (col1[row_index].as_slice(), col2[row_index].as_slice())
584 );
585 row_index += 1;
586 }
587 } else {
588 panic!("Expected Lz4 compressor")
589 }
590 }
591
592 #[test]
593 fn test_zstd_no_dictionaries() {
594 let (col1, col2) = test_data(None);
595 let num_rows = col1.len() as u64;
596 let num_columns = 2;
597 let file_path = tempfile::NamedTempFile::new().unwrap();
598
599 let nippy = NippyJar::new_without_header(num_columns, file_path.path());
600 assert!(nippy.compressor().is_none());
601
602 let nippy =
603 NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(false, 5000);
604 assert!(nippy.compressor().is_some());
605
606 let nippy = nippy
607 .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
608 .unwrap();
609
610 let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
611 assert_eq!(nippy, loaded_nippy);
612
613 if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() {
614 assert!(!zstd.use_dict);
615
616 let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
617
618 let mut row_index = 0usize;
620 while let Some(row) = cursor.next_row().unwrap() {
621 assert_eq!(
622 (row[0], row[1]),
623 (col1[row_index].as_slice(), col2[row_index].as_slice())
624 );
625 row_index += 1;
626 }
627 } else {
628 panic!("Expected Zstd compressor")
629 }
630 }
631
632 #[test]
634 fn test_full_nippy_jar() {
635 let (col1, col2) = test_data(None);
636 let num_rows = col1.len() as u64;
637 let num_columns = 2;
638 let file_path = tempfile::NamedTempFile::new().unwrap();
639 let data = vec![col1.clone(), col2.clone()];
640
641 let block_start = 500;
642
643 #[derive(Serialize, Deserialize, Debug)]
644 struct BlockJarHeader {
645 block_start: usize,
646 }
647
648 {
650 let mut nippy =
651 NippyJar::new(num_columns, file_path.path(), BlockJarHeader { block_start })
652 .with_zstd(true, 5000);
653
654 nippy.prepare_compression(data.clone()).unwrap();
655 nippy
656 .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
657 .unwrap();
658 }
659
660 {
662 let loaded_nippy = NippyJar::<BlockJarHeader>::load(file_path.path()).unwrap();
663
664 assert!(loaded_nippy.compressor().is_some());
665 assert_eq!(loaded_nippy.user_header().block_start, block_start);
666
667 if let Some(Compressors::Zstd(_zstd)) = loaded_nippy.compressor() {
668 let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
669
670 let mut row_num = 0usize;
672 while let Some(row) = cursor.next_row().unwrap() {
673 assert_eq!(
674 (row[0], row[1]),
675 (data[0][row_num].as_slice(), data[1][row_num].as_slice())
676 );
677 row_num += 1;
678 }
679
680 let mut data = col1.iter().zip(col2.iter()).enumerate().collect::<Vec<_>>();
682 data.shuffle(&mut rand::rng());
683
684 for (row_num, (v0, v1)) in data {
685 let row_by_num = cursor.row_by_number(row_num).unwrap().unwrap();
687 assert_eq!((&row_by_num[0].to_vec(), &row_by_num[1].to_vec()), (v0, v1));
688 }
689 }
690 }
691 }
692
693 #[test]
694 fn test_selectable_column_values() {
695 let (col1, col2) = test_data(None);
696 let num_rows = col1.len() as u64;
697 let num_columns = 2;
698 let file_path = tempfile::NamedTempFile::new().unwrap();
699 let data = vec![col1.clone(), col2.clone()];
700
701 {
703 let mut nippy =
704 NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(true, 5000);
705 nippy.prepare_compression(data).unwrap();
706 nippy
707 .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
708 .unwrap();
709 }
710
711 {
713 let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
714
715 if let Some(Compressors::Zstd(_zstd)) = loaded_nippy.compressor() {
716 let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
717
718 let mut data = col1.iter().zip(col2.iter()).enumerate().collect::<Vec<_>>();
720 data.shuffle(&mut rand::rng());
721
722 const BLOCKS_FULL_MASK: usize = 0b11;
724
725 for (row_num, (v0, v1)) in &data {
727 let row_by_num = cursor
729 .row_by_number_with_cols(*row_num, BLOCKS_FULL_MASK)
730 .unwrap()
731 .unwrap();
732 assert_eq!((&row_by_num[0].to_vec(), &row_by_num[1].to_vec()), (*v0, *v1));
733 }
734
735 const BLOCKS_BLOCK_MASK: usize = 0b01;
737 for (row_num, (v0, _)) in &data {
738 let row_by_num = cursor
740 .row_by_number_with_cols(*row_num, BLOCKS_BLOCK_MASK)
741 .unwrap()
742 .unwrap();
743 assert_eq!(row_by_num.len(), 1);
744 assert_eq!(&row_by_num[0].to_vec(), *v0);
745 }
746
747 const BLOCKS_WITHDRAWAL_MASK: usize = 0b10;
749 for (row_num, (_, v1)) in &data {
750 let row_by_num = cursor
752 .row_by_number_with_cols(*row_num, BLOCKS_WITHDRAWAL_MASK)
753 .unwrap()
754 .unwrap();
755 assert_eq!(row_by_num.len(), 1);
756 assert_eq!(&row_by_num[0].to_vec(), *v1);
757 }
758
759 const BLOCKS_EMPTY_MASK: usize = 0b00;
761 for (row_num, _) in &data {
762 assert!(cursor
764 .row_by_number_with_cols(*row_num, BLOCKS_EMPTY_MASK)
765 .unwrap()
766 .unwrap()
767 .is_empty());
768 }
769 }
770 }
771 }
772
773 #[test]
774 fn test_writer() {
775 let (col1, col2) = test_data(None);
776 let num_columns = 2;
777 let file_path = tempfile::NamedTempFile::new().unwrap();
778
779 append_two_rows(num_columns, file_path.path(), &col1, &col2);
780
781 prune_rows(num_columns, file_path.path(), &col1, &col2);
784
785 append_two_rows(num_columns, file_path.path(), &col1, &col2);
787
788 test_append_consistency_no_commit(file_path.path(), &col1, &col2);
791
792 test_append_consistency_partial_commit(file_path.path(), &col1, &col2);
794 }
795
796 #[test]
797 fn test_pruner() {
798 let (col1, col2) = test_data(None);
799 let num_columns = 2;
800 let num_rows = 2;
801
802 let missing_offsets_scenarios = [(1, 1), (2, 1), (3, 0)];
805
806 for (missing_offsets, expected_rows) in missing_offsets_scenarios {
807 let file_path = tempfile::NamedTempFile::new().unwrap();
808
809 append_two_rows(num_columns, file_path.path(), &col1, &col2);
810
811 simulate_interrupted_prune(num_columns, file_path.path(), num_rows, missing_offsets);
812
813 let nippy = NippyJar::load_without_header(file_path.path()).unwrap();
814 assert_eq!(nippy.rows, expected_rows);
815 }
816 }
817
818 fn test_append_consistency_partial_commit(
819 file_path: &Path,
820 col1: &[Vec<u8>],
821 col2: &[Vec<u8>],
822 ) {
823 let nippy = NippyJar::load_without_header(file_path).unwrap();
824
825 let initial_rows = nippy.rows;
827 let initial_data_size =
828 File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
829 let initial_offset_size =
830 File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize;
831 assert!(initial_data_size > 0);
832 assert!(initial_offset_size > 0);
833
834 let mut writer = NippyJarWriter::new(nippy).unwrap();
836 writer.append_column(Some(Ok(&col1[2]))).unwrap();
837 writer.append_column(Some(Ok(&col2[2]))).unwrap();
838
839 let _ = writer.offsets_mut().pop();
841
842 writer.commit_offsets().unwrap();
845
846 drop(writer);
848
849 let nippy = NippyJar::load_without_header(file_path).unwrap();
850 assert_eq!(initial_rows, nippy.rows);
851
852 let new_data_size =
854 File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
855 assert_eq!(new_data_size, initial_data_size + col1[2].len() + col2[2].len());
856
857 assert_eq!(
859 initial_offset_size + 8,
860 File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize
861 );
862
863 let writer = NippyJarWriter::new(nippy).unwrap();
867 assert_eq!(initial_rows, writer.rows());
868 assert_eq!(
869 initial_offset_size,
870 File::open(writer.offsets_path()).unwrap().metadata().unwrap().len() as usize
871 );
872 assert_eq!(
873 initial_data_size,
874 File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize
875 );
876 }
877
878 fn test_append_consistency_no_commit(file_path: &Path, col1: &[Vec<u8>], col2: &[Vec<u8>]) {
879 let nippy = NippyJar::load_without_header(file_path).unwrap();
880
881 let initial_rows = nippy.rows;
883 let initial_data_size =
884 File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
885 let initial_offset_size =
886 File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize;
887 assert!(initial_data_size > 0);
888 assert!(initial_offset_size > 0);
889
890 let mut writer = NippyJarWriter::new(nippy).unwrap();
893 writer.append_column(Some(Ok(&col1[2]))).unwrap();
894 writer.append_column(Some(Ok(&col2[2]))).unwrap();
895
896 drop(writer);
898
899 let nippy = NippyJar::load_without_header(file_path).unwrap();
900 assert_eq!(initial_rows, nippy.rows);
901
902 let new_data_size =
904 File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
905 assert_eq!(new_data_size, initial_data_size + col1[2].len() + col2[2].len());
906
907 assert_eq!(
909 initial_offset_size,
910 File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize
911 );
912
913 let writer = NippyJarWriter::new(nippy).unwrap();
916 assert_eq!(initial_rows, writer.rows());
917 assert_eq!(
918 initial_data_size,
919 File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize
920 );
921 }
922
923 fn append_two_rows(num_columns: usize, file_path: &Path, col1: &[Vec<u8>], col2: &[Vec<u8>]) {
924 {
926 let nippy = NippyJar::new_without_header(num_columns, file_path);
927 nippy.freeze_config().unwrap();
928 assert_eq!(nippy.max_row_size, 0);
929 assert_eq!(nippy.rows, 0);
930
931 let mut writer = NippyJarWriter::new(nippy).unwrap();
932 assert_eq!(writer.column(), 0);
933
934 writer.append_column(Some(Ok(&col1[0]))).unwrap();
935 assert_eq!(writer.column(), 1);
936 assert!(writer.is_dirty());
937
938 writer.append_column(Some(Ok(&col2[0]))).unwrap();
939 assert!(writer.is_dirty());
940
941 assert_eq!(writer.column(), 0);
943
944 assert_eq!(writer.offsets().len(), 3);
946 let expected_data_file_size = *writer.offsets().last().unwrap();
947 writer.commit().unwrap();
948 assert!(!writer.is_dirty());
949
950 assert_eq!(writer.max_row_size(), col1[0].len() + col2[0].len());
951 assert_eq!(writer.rows(), 1);
952 assert_eq!(
953 File::open(writer.offsets_path()).unwrap().metadata().unwrap().len(),
954 1 + num_columns as u64 * 8 + 8
955 );
956 assert_eq!(
957 File::open(writer.data_path()).unwrap().metadata().unwrap().len(),
958 expected_data_file_size
959 );
960 }
961
962 {
964 let nippy = NippyJar::load_without_header(file_path).unwrap();
965 assert_eq!(nippy.max_row_size, col1[0].len() + col2[0].len());
967 assert_eq!(nippy.rows, 1);
968
969 let mut writer = NippyJarWriter::new(nippy).unwrap();
970 assert_eq!(writer.column(), 0);
971
972 writer.append_column(Some(Ok(&col1[1]))).unwrap();
973 assert_eq!(writer.column(), 1);
974
975 writer.append_column(Some(Ok(&col2[1]))).unwrap();
976
977 assert_eq!(writer.column(), 0);
979
980 assert_eq!(writer.offsets().len(), 3);
982 let expected_data_file_size = *writer.offsets().last().unwrap();
983 writer.commit().unwrap();
984
985 assert_eq!(writer.max_row_size(), col1[0].len() + col2[0].len());
986 assert_eq!(writer.rows(), 2);
987 assert_eq!(
988 File::open(writer.offsets_path()).unwrap().metadata().unwrap().len(),
989 1 + writer.rows() as u64 * num_columns as u64 * 8 + 8
990 );
991 assert_eq!(
992 File::open(writer.data_path()).unwrap().metadata().unwrap().len(),
993 expected_data_file_size
994 );
995 }
996 }
997
998 fn prune_rows(num_columns: usize, file_path: &Path, col1: &[Vec<u8>], col2: &[Vec<u8>]) {
999 let nippy = NippyJar::load_without_header(file_path).unwrap();
1000 let mut writer = NippyJarWriter::new(nippy).unwrap();
1001
1002 writer.append_column(Some(Ok(&col1[2]))).unwrap();
1004 writer.append_column(Some(Ok(&col2[2]))).unwrap();
1005 assert!(writer.is_dirty());
1006
1007 writer.prune_rows(2).unwrap();
1009 assert_eq!(writer.rows(), 1);
1010
1011 assert_eq!(
1012 File::open(writer.offsets_path()).unwrap().metadata().unwrap().len(),
1013 1 + writer.rows() as u64 * num_columns as u64 * 8 + 8
1014 );
1015
1016 let expected_data_size = col1[0].len() + col2[0].len();
1017 assert_eq!(
1018 File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize,
1019 expected_data_size
1020 );
1021
1022 let nippy = NippyJar::load_without_header(file_path).unwrap();
1023 {
1024 let data_reader = nippy.open_data_reader().unwrap();
1025 assert_eq!(data_reader.offset(2).unwrap(), expected_data_size as u64);
1028 }
1029
1030 let mut writer = NippyJarWriter::new(nippy).unwrap();
1032 writer.prune_rows(1).unwrap();
1033 assert!(writer.is_dirty());
1034
1035 assert_eq!(writer.rows(), 0);
1036 assert_eq!(writer.max_row_size(), 0);
1037 assert_eq!(File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize, 0);
1038 assert_eq!(
1040 File::open(writer.offsets_path()).unwrap().metadata().unwrap().len() as usize,
1041 1
1042 );
1043 writer.commit().unwrap();
1044 assert!(!writer.is_dirty());
1045 }
1046
1047 fn simulate_interrupted_prune(
1048 num_columns: usize,
1049 file_path: &Path,
1050 num_rows: u64,
1051 missing_offsets: u64,
1052 ) {
1053 let nippy = NippyJar::load_without_header(file_path).unwrap();
1054 let reader = nippy.open_data_reader().unwrap();
1055 let offsets_file =
1056 OpenOptions::new().read(true).write(true).open(nippy.offsets_path()).unwrap();
1057 let offsets_len = 1 + num_rows * num_columns as u64 * 8 + 8;
1058 assert_eq!(offsets_len, offsets_file.metadata().unwrap().len());
1059
1060 let data_file = OpenOptions::new().read(true).write(true).open(nippy.data_path()).unwrap();
1061 let data_len = reader.reverse_offset(0).unwrap();
1062 assert_eq!(data_len, data_file.metadata().unwrap().len());
1063
1064 data_file.set_len(data_len - 32 * missing_offsets).unwrap();
1070
1071 let _ = NippyJarWriter::new(nippy).unwrap();
1073 }
1074}