1#![doc(
8 html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
9 html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
10 issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
11)]
12#![cfg_attr(not(test), warn(unused_crate_dependencies))]
13#![cfg_attr(docsrs, feature(doc_cfg))]
14
15use memmap2::Mmap;
16use serde::{Deserialize, Serialize};
17use std::{
18 error::Error as StdError,
19 fs::File,
20 io::{Read, Write},
21 ops::Range,
22 path::{Path, PathBuf},
23};
24use tracing::*;
25
26pub mod compression;
28#[cfg(test)]
29use compression::Compression;
30use compression::Compressors;
31
32#[derive(Debug, Serialize, Deserialize)]
34#[cfg_attr(test, derive(PartialEq, Eq))]
35pub enum Functions {}
36
37#[derive(Debug, Serialize, Deserialize)]
39#[cfg_attr(test, derive(PartialEq, Eq))]
40pub enum InclusionFilters {}
41
42mod error;
43pub use error::NippyJarError;
44
45mod cursor;
46pub use cursor::NippyJarCursor;
47
48mod writer;
49pub use writer::NippyJarWriter;
50
51mod consistency;
52pub use consistency::NippyJarChecker;
53
54const NIPPY_JAR_VERSION: usize = 1;
56const INDEX_FILE_EXTENSION: &str = "idx";
58const OFFSETS_FILE_EXTENSION: &str = "off";
60pub const CONFIG_FILE_EXTENSION: &str = "conf";
62
63type RefRow<'a> = Vec<&'a [u8]>;
66
67pub type ColumnResult<T> = Result<T, Box<dyn StdError + Send + Sync>>;
69
70pub trait NippyJarHeader:
72 Send + Sync + Serialize + for<'b> Deserialize<'b> + std::fmt::Debug + 'static
73{
74}
75
76impl<T> NippyJarHeader for T where
78 T: Send + Sync + Serialize + for<'b> Deserialize<'b> + std::fmt::Debug + 'static
79{
80}
81
82#[derive(Serialize, Deserialize)]
87#[cfg_attr(test, derive(PartialEq))]
88pub struct NippyJar<H = ()> {
89 version: usize,
91 user_header: H,
94 columns: usize,
96 rows: usize,
98 compressor: Option<Compressors>,
100 #[serde(skip)]
101 filter: Option<InclusionFilters>,
103 #[serde(skip)]
104 phf: Option<Functions>,
106 max_row_size: usize,
109 #[serde(skip)]
111 path: PathBuf,
112}
113
114impl<H: NippyJarHeader> std::fmt::Debug for NippyJar<H> {
115 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116 f.debug_struct("NippyJar")
117 .field("version", &self.version)
118 .field("user_header", &self.user_header)
119 .field("rows", &self.rows)
120 .field("columns", &self.columns)
121 .field("compressor", &self.compressor)
122 .field("filter", &self.filter)
123 .field("phf", &self.phf)
124 .field("path", &self.path)
125 .field("max_row_size", &self.max_row_size)
126 .finish_non_exhaustive()
127 }
128}
129
130impl NippyJar<()> {
131 pub fn new_without_header(columns: usize, path: &Path) -> Self {
133 Self::new(columns, path, ())
134 }
135
136 pub fn load_without_header(path: &Path) -> Result<Self, NippyJarError> {
138 Self::load(path)
139 }
140}
141
142impl<H: NippyJarHeader> NippyJar<H> {
143 pub fn new(columns: usize, path: &Path, user_header: H) -> Self {
145 Self {
146 version: NIPPY_JAR_VERSION,
147 user_header,
148 columns,
149 rows: 0,
150 max_row_size: 0,
151 compressor: None,
152 filter: None,
153 phf: None,
154 path: path.to_path_buf(),
155 }
156 }
157
158 pub fn with_zstd(mut self, use_dict: bool, max_dict_size: usize) -> Self {
160 self.compressor =
161 Some(Compressors::Zstd(compression::Zstd::new(use_dict, max_dict_size, self.columns)));
162 self
163 }
164
165 pub fn with_lz4(mut self) -> Self {
167 self.compressor = Some(Compressors::Lz4(compression::Lz4::default()));
168 self
169 }
170
171 pub const fn user_header(&self) -> &H {
173 &self.user_header
174 }
175
176 pub const fn columns(&self) -> usize {
178 self.columns
179 }
180
181 pub const fn rows(&self) -> usize {
183 self.rows
184 }
185
186 pub const fn compressor(&self) -> Option<&Compressors> {
188 self.compressor.as_ref()
189 }
190
191 pub const fn compressor_mut(&mut self) -> Option<&mut Compressors> {
193 self.compressor.as_mut()
194 }
195
196 pub fn load(path: &Path) -> Result<Self, NippyJarError> {
200 let config_path = path.with_extension(CONFIG_FILE_EXTENSION);
202 let config_file = File::open(&config_path)
203 .inspect_err(|e| {
204 warn!( ?path, %e, "Failed to load static file jar");
205 })
206 .map_err(|err| reth_fs_util::FsPathError::open(err, config_path))?;
207
208 let mut obj = Self::load_from_reader(config_file)?;
209 obj.path = path.to_path_buf();
210 Ok(obj)
211 }
212
213 pub fn load_from_reader<R: Read>(reader: R) -> Result<Self, NippyJarError> {
215 Ok(bincode::deserialize_from(reader)?)
216 }
217
218 pub fn save_to_writer<W: Write>(&self, writer: W) -> Result<(), NippyJarError> {
220 Ok(bincode::serialize_into(writer, self)?)
221 }
222
223 pub fn data_path(&self) -> &Path {
225 self.path.as_ref()
226 }
227
228 pub fn index_path(&self) -> PathBuf {
230 self.path.with_extension(INDEX_FILE_EXTENSION)
231 }
232
233 pub fn offsets_path(&self) -> PathBuf {
235 self.path.with_extension(OFFSETS_FILE_EXTENSION)
236 }
237
238 pub fn config_path(&self) -> PathBuf {
240 self.path.with_extension(CONFIG_FILE_EXTENSION)
241 }
242
243 pub fn delete(self) -> Result<(), NippyJarError> {
245 for path in
248 [self.data_path().into(), self.index_path(), self.offsets_path(), self.config_path()]
249 {
250 if path.exists() {
251 debug!(target: "nippy-jar", ?path, "Removing file.");
252 reth_fs_util::remove_file(path)?;
253 }
254 }
255
256 Ok(())
257 }
258
259 pub fn open_data_reader(&self) -> Result<DataReader, NippyJarError> {
261 DataReader::new(self.data_path())
262 }
263
264 fn freeze_config(&self) -> Result<(), NippyJarError> {
266 Ok(reth_fs_util::atomic_write_file(&self.config_path(), |file| self.save_to_writer(file))?)
267 }
268}
269
270#[cfg(test)]
271impl<H: NippyJarHeader> NippyJar<H> {
272 pub fn prepare_compression(
274 &mut self,
275 columns: Vec<impl IntoIterator<Item = Vec<u8>>>,
276 ) -> Result<(), NippyJarError> {
277 if let Some(compression) = &mut self.compressor {
279 debug!(target: "nippy-jar", columns=columns.len(), "Preparing compression.");
280 compression.prepare_compression(columns)?;
281 }
282 Ok(())
283 }
284
285 pub fn freeze(
287 self,
288 columns: Vec<impl IntoIterator<Item = ColumnResult<Vec<u8>>>>,
289 total_rows: u64,
290 ) -> Result<Self, NippyJarError> {
291 self.check_before_freeze(&columns)?;
292
293 debug!(target: "nippy-jar", path=?self.data_path(), "Opening data file.");
294
295 let mut writer = NippyJarWriter::new(self)?;
297
298 writer.append_rows(columns, total_rows)?;
300
301 writer.commit()?;
303
304 debug!(target: "nippy-jar", ?writer, "Finished writing data.");
305
306 Ok(writer.into_jar())
307 }
308
309 fn check_before_freeze(
311 &self,
312 columns: &[impl IntoIterator<Item = ColumnResult<Vec<u8>>>],
313 ) -> Result<(), NippyJarError> {
314 if columns.len() != self.columns {
315 return Err(NippyJarError::ColumnLenMismatch(self.columns, columns.len()))
316 }
317
318 if let Some(compression) = &self.compressor &&
319 !compression.is_ready()
320 {
321 return Err(NippyJarError::CompressorNotReady)
322 }
323
324 Ok(())
325 }
326}
327
328#[derive(Debug)]
332pub struct DataReader {
333 #[expect(dead_code)]
335 data_file: File,
336 data_mmap: Mmap,
338 offset_file: File,
340 offset_mmap: Mmap,
342 offset_size: u8,
344}
345
346impl DataReader {
347 pub fn new(path: impl AsRef<Path>) -> Result<Self, NippyJarError> {
349 let data_file = File::open(path.as_ref())?;
350 let data_mmap = unsafe { Mmap::map(&data_file)? };
352
353 let offset_file = File::open(path.as_ref().with_extension(OFFSETS_FILE_EXTENSION))?;
354 let offset_mmap = unsafe { Mmap::map(&offset_file)? };
356
357 let offset_size = offset_mmap[0];
359
360 if offset_size > 8 {
362 return Err(NippyJarError::OffsetSizeTooBig { offset_size })
363 } else if offset_size == 0 {
364 return Err(NippyJarError::OffsetSizeTooSmall { offset_size })
365 }
366
367 Ok(Self { data_file, data_mmap, offset_file, offset_size, offset_mmap })
368 }
369
370 pub fn offset(&self, index: usize) -> Result<u64, NippyJarError> {
372 let from = index * self.offset_size as usize + 1;
374
375 self.offset_at(from)
376 }
377
378 pub fn reverse_offset(&self, index: usize) -> Result<u64, NippyJarError> {
380 let offsets_file_size = self.offset_file.metadata()?.len() as usize;
381
382 if offsets_file_size > 1 {
383 let from = offsets_file_size - self.offset_size as usize * (index + 1);
384
385 self.offset_at(from)
386 } else {
387 Ok(0)
388 }
389 }
390
391 pub fn offsets_count(&self) -> Result<usize, NippyJarError> {
394 Ok((self.offset_file.metadata()?.len().saturating_sub(1) / self.offset_size as u64)
395 as usize)
396 }
397
398 fn offset_at(&self, index: usize) -> Result<u64, NippyJarError> {
400 let mut buffer: [u8; 8] = [0; 8];
401
402 let offset_end = index.saturating_add(self.offset_size as usize);
403 if offset_end > self.offset_mmap.len() {
404 return Err(NippyJarError::OffsetOutOfBounds { index })
405 }
406
407 buffer[..self.offset_size as usize].copy_from_slice(&self.offset_mmap[index..offset_end]);
408 Ok(u64::from_le_bytes(buffer))
409 }
410
411 pub const fn offset_size(&self) -> u8 {
413 self.offset_size
414 }
415
416 pub fn data(&self, range: Range<usize>) -> &[u8] {
418 &self.data_mmap[range]
419 }
420
421 pub fn size(&self) -> usize {
423 self.data_mmap.len()
424 }
425}
426
427#[cfg(test)]
428mod tests {
429 use super::*;
430 use compression::Compression;
431 use rand::{rngs::SmallRng, seq::SliceRandom, RngCore, SeedableRng};
432 use std::{fs::OpenOptions, io::Read};
433
434 type ColumnResults<T> = Vec<ColumnResult<T>>;
435 type ColumnValues = Vec<Vec<u8>>;
436
437 fn test_data(seed: Option<u64>) -> (ColumnValues, ColumnValues) {
438 let value_length = 32;
439 let num_rows = 100;
440
441 let mut vec: Vec<u8> = vec![0; value_length];
442 let mut rng = seed.map(SmallRng::seed_from_u64).unwrap_or_else(SmallRng::from_os_rng);
443
444 let mut entry_gen = || {
445 (0..num_rows)
446 .map(|_| {
447 rng.fill_bytes(&mut vec[..]);
448 vec.clone()
449 })
450 .collect()
451 };
452
453 (entry_gen(), entry_gen())
454 }
455
456 fn clone_with_result(col: &ColumnValues) -> ColumnResults<Vec<u8>> {
457 col.iter().map(|v| Ok(v.clone())).collect()
458 }
459
460 #[test]
461 fn test_config_serialization() {
462 let file = tempfile::NamedTempFile::new().unwrap();
463 let jar = NippyJar::new_without_header(23, file.path()).with_lz4();
464 jar.freeze_config().unwrap();
465
466 let mut config_file = OpenOptions::new().read(true).open(jar.config_path()).unwrap();
467 let config_file_len = config_file.metadata().unwrap().len();
468 assert_eq!(config_file_len, 37);
469
470 let mut buf = Vec::with_capacity(config_file_len as usize);
471 config_file.read_to_end(&mut buf).unwrap();
472
473 assert_eq!(
474 vec![
475 1, 0, 0, 0, 0, 0, 0, 0, 23, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0,
476 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
477 ],
478 buf
479 );
480
481 let mut read_jar = bincode::deserialize_from::<_, NippyJar>(&buf[..]).unwrap();
482 read_jar.path = file.path().to_path_buf();
484 assert_eq!(jar, read_jar);
485 }
486
487 #[test]
488 fn test_zstd_with_dictionaries() {
489 let (col1, col2) = test_data(None);
490 let num_rows = col1.len() as u64;
491 let num_columns = 2;
492 let file_path = tempfile::NamedTempFile::new().unwrap();
493
494 let nippy = NippyJar::new_without_header(num_columns, file_path.path());
495 assert!(nippy.compressor().is_none());
496
497 let mut nippy =
498 NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(true, 5000);
499 assert!(nippy.compressor().is_some());
500
501 if let Some(Compressors::Zstd(zstd)) = &mut nippy.compressor_mut() {
502 assert!(matches!(zstd.compressors(), Err(NippyJarError::CompressorNotReady)));
503
504 assert!(matches!(
506 zstd.prepare_compression(vec![col1.clone(), col2.clone(), col2.clone()]),
507 Err(NippyJarError::ColumnLenMismatch(columns, 3)) if columns == num_columns
508 ));
509 }
510
511 assert!(matches!(
514 nippy.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows),
515 Err(NippyJarError::CompressorNotReady)
516 ));
517
518 let mut nippy =
519 NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(true, 5000);
520 assert!(nippy.compressor().is_some());
521
522 nippy.prepare_compression(vec![col1.clone(), col2.clone()]).unwrap();
523
524 if let Some(Compressors::Zstd(zstd)) = &nippy.compressor() {
525 assert!(matches!(
526 (&zstd.state, zstd.dictionaries.as_ref().map(|dict| dict.len())),
527 (compression::ZstdState::Ready, Some(columns)) if columns == num_columns
528 ));
529 }
530
531 let nippy = nippy
532 .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
533 .unwrap();
534
535 let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
536 assert_eq!(nippy.version, loaded_nippy.version);
537 assert_eq!(nippy.columns, loaded_nippy.columns);
538 assert_eq!(nippy.filter, loaded_nippy.filter);
539 assert_eq!(nippy.phf, loaded_nippy.phf);
540 assert_eq!(nippy.max_row_size, loaded_nippy.max_row_size);
541 assert_eq!(nippy.path, loaded_nippy.path);
542
543 if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() {
544 assert!(zstd.use_dict);
545 let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
546
547 let mut row_index = 0usize;
549 while let Some(row) = cursor.next_row().unwrap() {
550 assert_eq!(
551 (row[0], row[1]),
552 (col1[row_index].as_slice(), col2[row_index].as_slice())
553 );
554 row_index += 1;
555 }
556 } else {
557 panic!("Expected Zstd compressor")
558 }
559 }
560
561 #[test]
562 fn test_lz4() {
563 let (col1, col2) = test_data(None);
564 let num_rows = col1.len() as u64;
565 let num_columns = 2;
566 let file_path = tempfile::NamedTempFile::new().unwrap();
567
568 let nippy = NippyJar::new_without_header(num_columns, file_path.path());
569 assert!(nippy.compressor().is_none());
570
571 let nippy = NippyJar::new_without_header(num_columns, file_path.path()).with_lz4();
572 assert!(nippy.compressor().is_some());
573
574 let nippy = nippy
575 .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
576 .unwrap();
577
578 let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
579 assert_eq!(nippy, loaded_nippy);
580
581 if let Some(Compressors::Lz4(_)) = loaded_nippy.compressor() {
582 let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
583
584 let mut row_index = 0usize;
586 while let Some(row) = cursor.next_row().unwrap() {
587 assert_eq!(
588 (row[0], row[1]),
589 (col1[row_index].as_slice(), col2[row_index].as_slice())
590 );
591 row_index += 1;
592 }
593 } else {
594 panic!("Expected Lz4 compressor")
595 }
596 }
597
598 #[test]
599 fn test_zstd_no_dictionaries() {
600 let (col1, col2) = test_data(None);
601 let num_rows = col1.len() as u64;
602 let num_columns = 2;
603 let file_path = tempfile::NamedTempFile::new().unwrap();
604
605 let nippy = NippyJar::new_without_header(num_columns, file_path.path());
606 assert!(nippy.compressor().is_none());
607
608 let nippy =
609 NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(false, 5000);
610 assert!(nippy.compressor().is_some());
611
612 let nippy = nippy
613 .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
614 .unwrap();
615
616 let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
617 assert_eq!(nippy, loaded_nippy);
618
619 if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() {
620 assert!(!zstd.use_dict);
621
622 let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
623
624 let mut row_index = 0usize;
626 while let Some(row) = cursor.next_row().unwrap() {
627 assert_eq!(
628 (row[0], row[1]),
629 (col1[row_index].as_slice(), col2[row_index].as_slice())
630 );
631 row_index += 1;
632 }
633 } else {
634 panic!("Expected Zstd compressor")
635 }
636 }
637
638 #[test]
640 fn test_full_nippy_jar() {
641 let (col1, col2) = test_data(None);
642 let num_rows = col1.len() as u64;
643 let num_columns = 2;
644 let file_path = tempfile::NamedTempFile::new().unwrap();
645 let data = vec![col1.clone(), col2.clone()];
646
647 let block_start = 500;
648
649 #[derive(Serialize, Deserialize, Debug)]
650 struct BlockJarHeader {
651 block_start: usize,
652 }
653
654 {
656 let mut nippy =
657 NippyJar::new(num_columns, file_path.path(), BlockJarHeader { block_start })
658 .with_zstd(true, 5000);
659
660 nippy.prepare_compression(data.clone()).unwrap();
661 nippy
662 .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
663 .unwrap();
664 }
665
666 {
668 let loaded_nippy = NippyJar::<BlockJarHeader>::load(file_path.path()).unwrap();
669
670 assert!(loaded_nippy.compressor().is_some());
671 assert_eq!(loaded_nippy.user_header().block_start, block_start);
672
673 if let Some(Compressors::Zstd(_zstd)) = loaded_nippy.compressor() {
674 let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
675
676 let mut row_num = 0usize;
678 while let Some(row) = cursor.next_row().unwrap() {
679 assert_eq!(
680 (row[0], row[1]),
681 (data[0][row_num].as_slice(), data[1][row_num].as_slice())
682 );
683 row_num += 1;
684 }
685
686 let mut data = col1.iter().zip(col2.iter()).enumerate().collect::<Vec<_>>();
688 data.shuffle(&mut rand::rng());
689
690 for (row_num, (v0, v1)) in data {
691 let row_by_num = cursor.row_by_number(row_num).unwrap().unwrap();
693 assert_eq!((&row_by_num[0].to_vec(), &row_by_num[1].to_vec()), (v0, v1));
694 }
695 }
696 }
697 }
698
699 #[test]
700 fn test_selectable_column_values() {
701 let (col1, col2) = test_data(None);
702 let num_rows = col1.len() as u64;
703 let num_columns = 2;
704 let file_path = tempfile::NamedTempFile::new().unwrap();
705 let data = vec![col1.clone(), col2.clone()];
706
707 {
709 let mut nippy =
710 NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(true, 5000);
711 nippy.prepare_compression(data).unwrap();
712 nippy
713 .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
714 .unwrap();
715 }
716
717 {
719 let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
720
721 if let Some(Compressors::Zstd(_zstd)) = loaded_nippy.compressor() {
722 let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
723
724 let mut data = col1.iter().zip(col2.iter()).enumerate().collect::<Vec<_>>();
726 data.shuffle(&mut rand::rng());
727
728 const BLOCKS_FULL_MASK: usize = 0b11;
730
731 for (row_num, (v0, v1)) in &data {
733 let row_by_num = cursor
735 .row_by_number_with_cols(*row_num, BLOCKS_FULL_MASK)
736 .unwrap()
737 .unwrap();
738 assert_eq!((&row_by_num[0].to_vec(), &row_by_num[1].to_vec()), (*v0, *v1));
739 }
740
741 const BLOCKS_BLOCK_MASK: usize = 0b01;
743 for (row_num, (v0, _)) in &data {
744 let row_by_num = cursor
746 .row_by_number_with_cols(*row_num, BLOCKS_BLOCK_MASK)
747 .unwrap()
748 .unwrap();
749 assert_eq!(row_by_num.len(), 1);
750 assert_eq!(&row_by_num[0].to_vec(), *v0);
751 }
752
753 const BLOCKS_WITHDRAWAL_MASK: usize = 0b10;
755 for (row_num, (_, v1)) in &data {
756 let row_by_num = cursor
758 .row_by_number_with_cols(*row_num, BLOCKS_WITHDRAWAL_MASK)
759 .unwrap()
760 .unwrap();
761 assert_eq!(row_by_num.len(), 1);
762 assert_eq!(&row_by_num[0].to_vec(), *v1);
763 }
764
765 const BLOCKS_EMPTY_MASK: usize = 0b00;
767 for (row_num, _) in &data {
768 assert!(cursor
770 .row_by_number_with_cols(*row_num, BLOCKS_EMPTY_MASK)
771 .unwrap()
772 .unwrap()
773 .is_empty());
774 }
775 }
776 }
777 }
778
779 #[test]
780 fn test_writer() {
781 let (col1, col2) = test_data(None);
782 let num_columns = 2;
783 let file_path = tempfile::NamedTempFile::new().unwrap();
784
785 append_two_rows(num_columns, file_path.path(), &col1, &col2);
786
787 prune_rows(num_columns, file_path.path(), &col1, &col2);
790
791 append_two_rows(num_columns, file_path.path(), &col1, &col2);
793
794 test_append_consistency_no_commit(file_path.path(), &col1, &col2);
797
798 test_append_consistency_partial_commit(file_path.path(), &col1, &col2);
800 }
801
802 #[test]
803 fn test_pruner() {
804 let (col1, col2) = test_data(None);
805 let num_columns = 2;
806 let num_rows = 2;
807
808 let missing_offsets_scenarios = [(1, 1), (2, 1), (3, 0)];
811
812 for (missing_offsets, expected_rows) in missing_offsets_scenarios {
813 let file_path = tempfile::NamedTempFile::new().unwrap();
814
815 append_two_rows(num_columns, file_path.path(), &col1, &col2);
816
817 simulate_interrupted_prune(num_columns, file_path.path(), num_rows, missing_offsets);
818
819 let nippy = NippyJar::load_without_header(file_path.path()).unwrap();
820 assert_eq!(nippy.rows, expected_rows);
821 }
822 }
823
824 fn test_append_consistency_partial_commit(
825 file_path: &Path,
826 col1: &[Vec<u8>],
827 col2: &[Vec<u8>],
828 ) {
829 let nippy = NippyJar::load_without_header(file_path).unwrap();
830
831 let initial_rows = nippy.rows;
833 let initial_data_size =
834 File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
835 let initial_offset_size =
836 File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize;
837 assert!(initial_data_size > 0);
838 assert!(initial_offset_size > 0);
839
840 let mut writer = NippyJarWriter::new(nippy).unwrap();
842 writer.append_column(Some(Ok(&col1[2]))).unwrap();
843 writer.append_column(Some(Ok(&col2[2]))).unwrap();
844
845 let _ = writer.offsets_mut().pop();
847
848 writer.commit_offsets().unwrap();
851
852 drop(writer);
854
855 let nippy = NippyJar::load_without_header(file_path).unwrap();
856 assert_eq!(initial_rows, nippy.rows);
857
858 let new_data_size =
860 File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
861 assert_eq!(new_data_size, initial_data_size + col1[2].len() + col2[2].len());
862
863 assert_eq!(
865 initial_offset_size + 8,
866 File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize
867 );
868
869 let writer = NippyJarWriter::new(nippy).unwrap();
873 assert_eq!(initial_rows, writer.rows());
874 assert_eq!(
875 initial_offset_size,
876 File::open(writer.offsets_path()).unwrap().metadata().unwrap().len() as usize
877 );
878 assert_eq!(
879 initial_data_size,
880 File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize
881 );
882 }
883
884 fn test_append_consistency_no_commit(file_path: &Path, col1: &[Vec<u8>], col2: &[Vec<u8>]) {
885 let nippy = NippyJar::load_without_header(file_path).unwrap();
886
887 let initial_rows = nippy.rows;
889 let initial_data_size =
890 File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
891 let initial_offset_size =
892 File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize;
893 assert!(initial_data_size > 0);
894 assert!(initial_offset_size > 0);
895
896 let mut writer = NippyJarWriter::new(nippy).unwrap();
899 writer.append_column(Some(Ok(&col1[2]))).unwrap();
900 writer.append_column(Some(Ok(&col2[2]))).unwrap();
901
902 drop(writer);
904
905 let nippy = NippyJar::load_without_header(file_path).unwrap();
906 assert_eq!(initial_rows, nippy.rows);
907
908 let new_data_size =
910 File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
911 assert_eq!(new_data_size, initial_data_size + col1[2].len() + col2[2].len());
912
913 assert_eq!(
915 initial_offset_size,
916 File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize
917 );
918
919 let writer = NippyJarWriter::new(nippy).unwrap();
922 assert_eq!(initial_rows, writer.rows());
923 assert_eq!(
924 initial_data_size,
925 File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize
926 );
927 }
928
929 fn append_two_rows(num_columns: usize, file_path: &Path, col1: &[Vec<u8>], col2: &[Vec<u8>]) {
930 {
932 let nippy = NippyJar::new_without_header(num_columns, file_path);
933 nippy.freeze_config().unwrap();
934 assert_eq!(nippy.max_row_size, 0);
935 assert_eq!(nippy.rows, 0);
936
937 let mut writer = NippyJarWriter::new(nippy).unwrap();
938 assert_eq!(writer.column(), 0);
939
940 writer.append_column(Some(Ok(&col1[0]))).unwrap();
941 assert_eq!(writer.column(), 1);
942 assert!(writer.is_dirty());
943
944 writer.append_column(Some(Ok(&col2[0]))).unwrap();
945 assert!(writer.is_dirty());
946
947 assert_eq!(writer.column(), 0);
949
950 assert_eq!(writer.offsets().len(), 3);
952 let expected_data_file_size = *writer.offsets().last().unwrap();
953 writer.commit().unwrap();
954 assert!(!writer.is_dirty());
955
956 assert_eq!(writer.max_row_size(), col1[0].len() + col2[0].len());
957 assert_eq!(writer.rows(), 1);
958 assert_eq!(
959 File::open(writer.offsets_path()).unwrap().metadata().unwrap().len(),
960 1 + num_columns as u64 * 8 + 8
961 );
962 assert_eq!(
963 File::open(writer.data_path()).unwrap().metadata().unwrap().len(),
964 expected_data_file_size
965 );
966 }
967
968 {
970 let nippy = NippyJar::load_without_header(file_path).unwrap();
971 assert_eq!(nippy.max_row_size, col1[0].len() + col2[0].len());
973 assert_eq!(nippy.rows, 1);
974
975 let mut writer = NippyJarWriter::new(nippy).unwrap();
976 assert_eq!(writer.column(), 0);
977
978 writer.append_column(Some(Ok(&col1[1]))).unwrap();
979 assert_eq!(writer.column(), 1);
980
981 writer.append_column(Some(Ok(&col2[1]))).unwrap();
982
983 assert_eq!(writer.column(), 0);
985
986 assert_eq!(writer.offsets().len(), 3);
988 let expected_data_file_size = *writer.offsets().last().unwrap();
989 writer.commit().unwrap();
990
991 assert_eq!(writer.max_row_size(), col1[0].len() + col2[0].len());
992 assert_eq!(writer.rows(), 2);
993 assert_eq!(
994 File::open(writer.offsets_path()).unwrap().metadata().unwrap().len(),
995 1 + writer.rows() as u64 * num_columns as u64 * 8 + 8
996 );
997 assert_eq!(
998 File::open(writer.data_path()).unwrap().metadata().unwrap().len(),
999 expected_data_file_size
1000 );
1001 }
1002 }
1003
1004 fn prune_rows(num_columns: usize, file_path: &Path, col1: &[Vec<u8>], col2: &[Vec<u8>]) {
1005 let nippy = NippyJar::load_without_header(file_path).unwrap();
1006 let mut writer = NippyJarWriter::new(nippy).unwrap();
1007
1008 writer.append_column(Some(Ok(&col1[2]))).unwrap();
1010 writer.append_column(Some(Ok(&col2[2]))).unwrap();
1011 assert!(writer.is_dirty());
1012
1013 writer.prune_rows(2).unwrap();
1015 assert_eq!(writer.rows(), 1);
1016
1017 assert_eq!(
1018 File::open(writer.offsets_path()).unwrap().metadata().unwrap().len(),
1019 1 + writer.rows() as u64 * num_columns as u64 * 8 + 8
1020 );
1021
1022 let expected_data_size = col1[0].len() + col2[0].len();
1023 assert_eq!(
1024 File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize,
1025 expected_data_size
1026 );
1027
1028 let nippy = NippyJar::load_without_header(file_path).unwrap();
1029 {
1030 let data_reader = nippy.open_data_reader().unwrap();
1031 assert_eq!(data_reader.offset(2).unwrap(), expected_data_size as u64);
1034 }
1035
1036 let mut writer = NippyJarWriter::new(nippy).unwrap();
1038 writer.prune_rows(1).unwrap();
1039 assert!(writer.is_dirty());
1040
1041 assert_eq!(writer.rows(), 0);
1042 assert_eq!(writer.max_row_size(), 0);
1043 assert_eq!(File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize, 0);
1044 assert_eq!(
1046 File::open(writer.offsets_path()).unwrap().metadata().unwrap().len() as usize,
1047 9
1048 );
1049 writer.commit().unwrap();
1050 assert!(!writer.is_dirty());
1051 }
1052
1053 fn simulate_interrupted_prune(
1054 num_columns: usize,
1055 file_path: &Path,
1056 num_rows: u64,
1057 missing_offsets: u64,
1058 ) {
1059 let nippy = NippyJar::load_without_header(file_path).unwrap();
1060 let reader = nippy.open_data_reader().unwrap();
1061 let offsets_file =
1062 OpenOptions::new().read(true).write(true).open(nippy.offsets_path()).unwrap();
1063 let offsets_len = 1 + num_rows * num_columns as u64 * 8 + 8;
1064 assert_eq!(offsets_len, offsets_file.metadata().unwrap().len());
1065
1066 let data_file = OpenOptions::new().read(true).write(true).open(nippy.data_path()).unwrap();
1067 let data_len = reader.reverse_offset(0).unwrap();
1068 assert_eq!(data_len, data_file.metadata().unwrap().len());
1069
1070 data_file.set_len(data_len - 32 * missing_offsets).unwrap();
1076
1077 let _ = NippyJarWriter::new(nippy).unwrap();
1079 }
1080}