1#![doc(
8 html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
9 html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
10 issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
11)]
12#![cfg_attr(not(test), warn(unused_crate_dependencies))]
13#![cfg_attr(docsrs, feature(doc_cfg))]
14
15use memmap2::Mmap;
16use serde::{Deserialize, Serialize};
17use std::{
18 error::Error as StdError,
19 fs::File,
20 io::{self, Read, Write},
21 ops::Range,
22 path::{Path, PathBuf},
23};
24use tracing::*;
25
26pub mod compression;
28#[cfg(test)]
29use compression::Compression;
30use compression::Compressors;
31
32#[derive(Debug, Serialize, Deserialize)]
34#[cfg_attr(test, derive(PartialEq, Eq))]
35pub enum Functions {}
36
37#[derive(Debug, Serialize, Deserialize)]
39#[cfg_attr(test, derive(PartialEq, Eq))]
40pub enum InclusionFilters {}
41
42mod error;
43pub use error::NippyJarError;
44
45mod cursor;
46pub use cursor::NippyJarCursor;
47
48mod writer;
49pub use writer::NippyJarWriter;
50
51mod consistency;
52pub use consistency::NippyJarChecker;
53
54const NIPPY_JAR_VERSION: usize = 1;
56const INDEX_FILE_EXTENSION: &str = "idx";
58const OFFSETS_FILE_EXTENSION: &str = "off";
60pub const CONFIG_FILE_EXTENSION: &str = "conf";
62
63type RefRow<'a> = Vec<&'a [u8]>;
66
67pub type ColumnResult<T> = Result<T, Box<dyn StdError + Send + Sync>>;
69
70pub trait NippyJarHeader:
72 Send + Sync + Serialize + for<'b> Deserialize<'b> + std::fmt::Debug + 'static
73{
74}
75
76impl<T> NippyJarHeader for T where
78 T: Send + Sync + Serialize + for<'b> Deserialize<'b> + std::fmt::Debug + 'static
79{
80}
81
82#[derive(Serialize, Deserialize)]
87#[cfg_attr(test, derive(PartialEq))]
88pub struct NippyJar<H = ()> {
89 version: usize,
91 user_header: H,
94 columns: usize,
96 rows: usize,
98 compressor: Option<Compressors>,
100 #[serde(skip)]
101 filter: Option<InclusionFilters>,
103 #[serde(skip)]
104 phf: Option<Functions>,
106 max_row_size: usize,
109 #[serde(skip)]
111 path: PathBuf,
112}
113
114impl<H: NippyJarHeader> std::fmt::Debug for NippyJar<H> {
115 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116 f.debug_struct("NippyJar")
117 .field("version", &self.version)
118 .field("user_header", &self.user_header)
119 .field("rows", &self.rows)
120 .field("columns", &self.columns)
121 .field("compressor", &self.compressor)
122 .field("filter", &self.filter)
123 .field("phf", &self.phf)
124 .field("path", &self.path)
125 .field("max_row_size", &self.max_row_size)
126 .finish_non_exhaustive()
127 }
128}
129
130impl NippyJar<()> {
131 pub fn new_without_header(columns: usize, path: &Path) -> Self {
133 Self::new(columns, path, ())
134 }
135
136 pub fn load_without_header(path: &Path) -> Result<Self, NippyJarError> {
138 Self::load(path)
139 }
140}
141
142impl<H: NippyJarHeader> NippyJar<H> {
143 pub fn new(columns: usize, path: &Path, user_header: H) -> Self {
145 Self {
146 version: NIPPY_JAR_VERSION,
147 user_header,
148 columns,
149 rows: 0,
150 max_row_size: 0,
151 compressor: None,
152 filter: None,
153 phf: None,
154 path: path.to_path_buf(),
155 }
156 }
157
158 pub fn with_zstd(mut self, use_dict: bool, max_dict_size: usize) -> Self {
160 self.compressor =
161 Some(Compressors::Zstd(compression::Zstd::new(use_dict, max_dict_size, self.columns)));
162 self
163 }
164
165 pub fn with_lz4(mut self) -> Self {
167 self.compressor = Some(Compressors::Lz4(compression::Lz4::default()));
168 self
169 }
170
171 pub const fn user_header(&self) -> &H {
173 &self.user_header
174 }
175
176 pub const fn columns(&self) -> usize {
178 self.columns
179 }
180
181 pub const fn rows(&self) -> usize {
183 self.rows
184 }
185
186 pub const fn compressor(&self) -> Option<&Compressors> {
188 self.compressor.as_ref()
189 }
190
191 pub const fn compressor_mut(&mut self) -> Option<&mut Compressors> {
193 self.compressor.as_mut()
194 }
195
196 pub fn load(path: &Path) -> Result<Self, NippyJarError> {
200 let config_path = path.with_extension(CONFIG_FILE_EXTENSION);
202 let config_file = File::open(&config_path)
203 .inspect_err(|e| {
204 warn!(?path, %e, "Failed to load static file jar");
205 })
206 .map_err(|err| reth_fs_util::FsPathError::open(err, config_path))?;
207
208 let mut obj = Self::load_from_reader(io::BufReader::new(config_file))?;
209 obj.path = path.to_path_buf();
210 Ok(obj)
211 }
212
213 pub fn load_from_reader<R: Read>(reader: R) -> Result<Self, NippyJarError> {
215 Ok(bincode::deserialize_from(reader)?)
216 }
217
218 pub fn save_to_writer<W: Write>(&self, writer: W) -> Result<(), NippyJarError> {
220 Ok(bincode::serialize_into(writer, self)?)
221 }
222
223 pub fn data_path(&self) -> &Path {
225 self.path.as_ref()
226 }
227
228 pub fn index_path(&self) -> PathBuf {
230 self.path.with_extension(INDEX_FILE_EXTENSION)
231 }
232
233 pub fn offsets_path(&self) -> PathBuf {
235 self.path.with_extension(OFFSETS_FILE_EXTENSION)
236 }
237
238 pub fn config_path(&self) -> PathBuf {
240 self.path.with_extension(CONFIG_FILE_EXTENSION)
241 }
242
243 pub fn delete(self) -> Result<(), NippyJarError> {
245 for path in
248 [self.data_path().into(), self.index_path(), self.offsets_path(), self.config_path()]
249 {
250 if path.exists() {
251 debug!(target: "nippy-jar", ?path, "Removing file.");
252 reth_fs_util::remove_file(path)?;
253 }
254 }
255
256 Ok(())
257 }
258
259 pub fn open_data_reader(&self) -> Result<DataReader, NippyJarError> {
261 DataReader::new(self.data_path())
262 }
263
264 fn freeze_config(&self) -> Result<(), NippyJarError> {
266 Ok(reth_fs_util::atomic_write_file(&self.config_path(), |file| self.save_to_writer(file))?)
267 }
268}
269
270#[cfg(test)]
271impl<H: NippyJarHeader> NippyJar<H> {
272 pub fn prepare_compression(
274 &mut self,
275 columns: Vec<impl IntoIterator<Item = Vec<u8>>>,
276 ) -> Result<(), NippyJarError> {
277 if let Some(compression) = &mut self.compressor {
279 debug!(target: "nippy-jar", columns=columns.len(), "Preparing compression.");
280 compression.prepare_compression(columns)?;
281 }
282 Ok(())
283 }
284
285 pub fn freeze(
287 self,
288 columns: Vec<impl IntoIterator<Item = ColumnResult<Vec<u8>>>>,
289 total_rows: u64,
290 ) -> Result<Self, NippyJarError> {
291 self.check_before_freeze(&columns)?;
292
293 debug!(target: "nippy-jar", path=?self.data_path(), "Opening data file.");
294
295 let mut writer = NippyJarWriter::new(self)?;
297
298 writer.append_rows(columns, total_rows)?;
300
301 writer.commit()?;
303
304 debug!(target: "nippy-jar", ?writer, "Finished writing data.");
305
306 Ok(writer.into_jar())
307 }
308
309 fn check_before_freeze(
311 &self,
312 columns: &[impl IntoIterator<Item = ColumnResult<Vec<u8>>>],
313 ) -> Result<(), NippyJarError> {
314 if columns.len() != self.columns {
315 return Err(NippyJarError::ColumnLenMismatch(self.columns, columns.len()))
316 }
317
318 if let Some(compression) = &self.compressor &&
319 !compression.is_ready()
320 {
321 return Err(NippyJarError::CompressorNotReady)
322 }
323
324 Ok(())
325 }
326}
327
328#[derive(Debug)]
332pub struct DataReader {
333 #[expect(dead_code)]
335 data_file: File,
336 data_mmap: Mmap,
338 offset_file: File,
340 offset_mmap: Mmap,
342 offset_size: u8,
344}
345
346impl DataReader {
347 pub fn new(path: impl AsRef<Path>) -> Result<Self, NippyJarError> {
349 let data_file = File::open(path.as_ref())?;
350 let data_mmap = unsafe { Mmap::map(&data_file)? };
352
353 let offset_file = File::open(path.as_ref().with_extension(OFFSETS_FILE_EXTENSION))?;
354 let offset_mmap = unsafe { Mmap::map(&offset_file)? };
356
357 let offset_size = offset_mmap[0];
359
360 if offset_size > 8 {
362 return Err(NippyJarError::OffsetSizeTooBig { offset_size })
363 } else if offset_size == 0 {
364 return Err(NippyJarError::OffsetSizeTooSmall { offset_size })
365 }
366
367 Ok(Self { data_file, data_mmap, offset_file, offset_size, offset_mmap })
368 }
369
370 pub fn offset(&self, index: usize) -> Result<u64, NippyJarError> {
372 let from = index * self.offset_size as usize + 1;
374
375 self.offset_at(from)
376 }
377
378 pub fn reverse_offset(&self, index: usize) -> Result<u64, NippyJarError> {
380 let offsets_file_size = self.offset_file.metadata()?.len() as usize;
381
382 if offsets_file_size > 1 {
383 let from = offsets_file_size - self.offset_size as usize * (index + 1);
384
385 self.offset_at(from)
386 } else {
387 Ok(0)
388 }
389 }
390
391 pub fn offsets_count(&self) -> Result<usize, NippyJarError> {
394 Ok((self.offset_file.metadata()?.len().saturating_sub(1) / self.offset_size as u64)
395 as usize)
396 }
397
398 fn offset_at(&self, index: usize) -> Result<u64, NippyJarError> {
400 let mut buffer: [u8; 8] = [0; 8];
401
402 let offset_end = index.saturating_add(self.offset_size as usize);
403 if offset_end > self.offset_mmap.len() {
404 return Err(NippyJarError::OffsetOutOfBounds { index })
405 }
406
407 buffer[..self.offset_size as usize].copy_from_slice(&self.offset_mmap[index..offset_end]);
408 Ok(u64::from_le_bytes(buffer))
409 }
410
411 pub const fn offset_size(&self) -> u8 {
413 self.offset_size
414 }
415
416 pub fn data(&self, range: Range<usize>) -> &[u8] {
418 &self.data_mmap[range]
419 }
420
421 pub fn size(&self) -> usize {
423 self.data_mmap.len()
424 }
425
426 pub fn offsets_size(&self) -> usize {
428 self.offset_mmap.len()
429 }
430}
431
432#[cfg(test)]
433mod tests {
434 use super::*;
435 use compression::Compression;
436 use rand::{rngs::SmallRng, seq::SliceRandom, RngCore, SeedableRng};
437 use std::{fs::OpenOptions, io::Read};
438
439 type ColumnResults<T> = Vec<ColumnResult<T>>;
440 type ColumnValues = Vec<Vec<u8>>;
441
442 fn test_data(seed: Option<u64>) -> (ColumnValues, ColumnValues) {
443 let value_length = 32;
444 let num_rows = 100;
445
446 let mut vec: Vec<u8> = vec![0; value_length];
447 let mut rng = seed.map(SmallRng::seed_from_u64).unwrap_or_else(SmallRng::from_os_rng);
448
449 let mut entry_gen = || {
450 (0..num_rows)
451 .map(|_| {
452 rng.fill_bytes(&mut vec[..]);
453 vec.clone()
454 })
455 .collect()
456 };
457
458 (entry_gen(), entry_gen())
459 }
460
461 fn clone_with_result(col: &ColumnValues) -> ColumnResults<Vec<u8>> {
462 col.iter().map(|v| Ok(v.clone())).collect()
463 }
464
465 #[test]
466 fn test_config_serialization() {
467 let file = tempfile::NamedTempFile::new().unwrap();
468 let jar = NippyJar::new_without_header(23, file.path()).with_lz4();
469 jar.freeze_config().unwrap();
470
471 let mut config_file = OpenOptions::new().read(true).open(jar.config_path()).unwrap();
472 let config_file_len = config_file.metadata().unwrap().len();
473 assert_eq!(config_file_len, 37);
474
475 let mut buf = Vec::with_capacity(config_file_len as usize);
476 config_file.read_to_end(&mut buf).unwrap();
477
478 assert_eq!(
479 vec![
480 1, 0, 0, 0, 0, 0, 0, 0, 23, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0,
481 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
482 ],
483 buf
484 );
485
486 let mut read_jar = bincode::deserialize_from::<_, NippyJar>(&buf[..]).unwrap();
487 read_jar.path = file.path().to_path_buf();
489 assert_eq!(jar, read_jar);
490 }
491
492 #[test]
493 fn test_zstd_with_dictionaries() {
494 let (col1, col2) = test_data(None);
495 let num_rows = col1.len() as u64;
496 let num_columns = 2;
497 let file_path = tempfile::NamedTempFile::new().unwrap();
498
499 let nippy = NippyJar::new_without_header(num_columns, file_path.path());
500 assert!(nippy.compressor().is_none());
501
502 let mut nippy =
503 NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(true, 5000);
504 assert!(nippy.compressor().is_some());
505
506 if let Some(Compressors::Zstd(zstd)) = &mut nippy.compressor_mut() {
507 assert!(matches!(zstd.compressors(), Err(NippyJarError::CompressorNotReady)));
508
509 assert!(matches!(
511 zstd.prepare_compression(vec![col1.clone(), col2.clone(), col2.clone()]),
512 Err(NippyJarError::ColumnLenMismatch(columns, 3)) if columns == num_columns
513 ));
514 }
515
516 assert!(matches!(
519 nippy.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows),
520 Err(NippyJarError::CompressorNotReady)
521 ));
522
523 let mut nippy =
524 NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(true, 5000);
525 assert!(nippy.compressor().is_some());
526
527 nippy.prepare_compression(vec![col1.clone(), col2.clone()]).unwrap();
528
529 if let Some(Compressors::Zstd(zstd)) = &nippy.compressor() {
530 assert!(matches!(
531 (&zstd.state, zstd.dictionaries.as_ref().map(|dict| dict.len())),
532 (compression::ZstdState::Ready, Some(columns)) if columns == num_columns
533 ));
534 }
535
536 let nippy = nippy
537 .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
538 .unwrap();
539
540 let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
541 assert_eq!(nippy.version, loaded_nippy.version);
542 assert_eq!(nippy.columns, loaded_nippy.columns);
543 assert_eq!(nippy.filter, loaded_nippy.filter);
544 assert_eq!(nippy.phf, loaded_nippy.phf);
545 assert_eq!(nippy.max_row_size, loaded_nippy.max_row_size);
546 assert_eq!(nippy.path, loaded_nippy.path);
547
548 if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() {
549 assert!(zstd.use_dict);
550 let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
551
552 let mut row_index = 0usize;
554 while let Some(row) = cursor.next_row().unwrap() {
555 assert_eq!(
556 (row[0], row[1]),
557 (col1[row_index].as_slice(), col2[row_index].as_slice())
558 );
559 row_index += 1;
560 }
561 } else {
562 panic!("Expected Zstd compressor")
563 }
564 }
565
566 #[test]
567 fn test_lz4() {
568 let (col1, col2) = test_data(None);
569 let num_rows = col1.len() as u64;
570 let num_columns = 2;
571 let file_path = tempfile::NamedTempFile::new().unwrap();
572
573 let nippy = NippyJar::new_without_header(num_columns, file_path.path());
574 assert!(nippy.compressor().is_none());
575
576 let nippy = NippyJar::new_without_header(num_columns, file_path.path()).with_lz4();
577 assert!(nippy.compressor().is_some());
578
579 let nippy = nippy
580 .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
581 .unwrap();
582
583 let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
584 assert_eq!(nippy, loaded_nippy);
585
586 if let Some(Compressors::Lz4(_)) = loaded_nippy.compressor() {
587 let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
588
589 let mut row_index = 0usize;
591 while let Some(row) = cursor.next_row().unwrap() {
592 assert_eq!(
593 (row[0], row[1]),
594 (col1[row_index].as_slice(), col2[row_index].as_slice())
595 );
596 row_index += 1;
597 }
598 } else {
599 panic!("Expected Lz4 compressor")
600 }
601 }
602
603 #[test]
604 fn test_zstd_no_dictionaries() {
605 let (col1, col2) = test_data(None);
606 let num_rows = col1.len() as u64;
607 let num_columns = 2;
608 let file_path = tempfile::NamedTempFile::new().unwrap();
609
610 let nippy = NippyJar::new_without_header(num_columns, file_path.path());
611 assert!(nippy.compressor().is_none());
612
613 let nippy =
614 NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(false, 5000);
615 assert!(nippy.compressor().is_some());
616
617 let nippy = nippy
618 .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
619 .unwrap();
620
621 let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
622 assert_eq!(nippy, loaded_nippy);
623
624 if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() {
625 assert!(!zstd.use_dict);
626
627 let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
628
629 let mut row_index = 0usize;
631 while let Some(row) = cursor.next_row().unwrap() {
632 assert_eq!(
633 (row[0], row[1]),
634 (col1[row_index].as_slice(), col2[row_index].as_slice())
635 );
636 row_index += 1;
637 }
638 } else {
639 panic!("Expected Zstd compressor")
640 }
641 }
642
643 #[test]
645 fn test_full_nippy_jar() {
646 let (col1, col2) = test_data(None);
647 let num_rows = col1.len() as u64;
648 let num_columns = 2;
649 let file_path = tempfile::NamedTempFile::new().unwrap();
650 let data = vec![col1.clone(), col2.clone()];
651
652 let block_start = 500;
653
654 #[derive(Serialize, Deserialize, Debug)]
655 struct BlockJarHeader {
656 block_start: usize,
657 }
658
659 {
661 let mut nippy =
662 NippyJar::new(num_columns, file_path.path(), BlockJarHeader { block_start })
663 .with_zstd(true, 5000);
664
665 nippy.prepare_compression(data.clone()).unwrap();
666 nippy
667 .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
668 .unwrap();
669 }
670
671 {
673 let loaded_nippy = NippyJar::<BlockJarHeader>::load(file_path.path()).unwrap();
674
675 assert!(loaded_nippy.compressor().is_some());
676 assert_eq!(loaded_nippy.user_header().block_start, block_start);
677
678 if let Some(Compressors::Zstd(_zstd)) = loaded_nippy.compressor() {
679 let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
680
681 let mut row_num = 0usize;
683 while let Some(row) = cursor.next_row().unwrap() {
684 assert_eq!(
685 (row[0], row[1]),
686 (data[0][row_num].as_slice(), data[1][row_num].as_slice())
687 );
688 row_num += 1;
689 }
690
691 let mut data = col1.iter().zip(col2.iter()).enumerate().collect::<Vec<_>>();
693 data.shuffle(&mut rand::rng());
694
695 for (row_num, (v0, v1)) in data {
696 let row_by_num = cursor.row_by_number(row_num).unwrap().unwrap();
698 assert_eq!((&row_by_num[0].to_vec(), &row_by_num[1].to_vec()), (v0, v1));
699 }
700 }
701 }
702 }
703
704 #[test]
705 fn test_selectable_column_values() {
706 let (col1, col2) = test_data(None);
707 let num_rows = col1.len() as u64;
708 let num_columns = 2;
709 let file_path = tempfile::NamedTempFile::new().unwrap();
710 let data = vec![col1.clone(), col2.clone()];
711
712 {
714 let mut nippy =
715 NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(true, 5000);
716 nippy.prepare_compression(data).unwrap();
717 nippy
718 .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
719 .unwrap();
720 }
721
722 {
724 let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
725
726 if let Some(Compressors::Zstd(_zstd)) = loaded_nippy.compressor() {
727 let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
728
729 let mut data = col1.iter().zip(col2.iter()).enumerate().collect::<Vec<_>>();
731 data.shuffle(&mut rand::rng());
732
733 const BLOCKS_FULL_MASK: usize = 0b11;
735
736 for (row_num, (v0, v1)) in &data {
738 let row_by_num = cursor
740 .row_by_number_with_cols(*row_num, BLOCKS_FULL_MASK)
741 .unwrap()
742 .unwrap();
743 assert_eq!((&row_by_num[0].to_vec(), &row_by_num[1].to_vec()), (*v0, *v1));
744 }
745
746 const BLOCKS_BLOCK_MASK: usize = 0b01;
748 for (row_num, (v0, _)) in &data {
749 let row_by_num = cursor
751 .row_by_number_with_cols(*row_num, BLOCKS_BLOCK_MASK)
752 .unwrap()
753 .unwrap();
754 assert_eq!(row_by_num.len(), 1);
755 assert_eq!(&row_by_num[0].to_vec(), *v0);
756 }
757
758 const BLOCKS_WITHDRAWAL_MASK: usize = 0b10;
760 for (row_num, (_, v1)) in &data {
761 let row_by_num = cursor
763 .row_by_number_with_cols(*row_num, BLOCKS_WITHDRAWAL_MASK)
764 .unwrap()
765 .unwrap();
766 assert_eq!(row_by_num.len(), 1);
767 assert_eq!(&row_by_num[0].to_vec(), *v1);
768 }
769
770 const BLOCKS_EMPTY_MASK: usize = 0b00;
772 for (row_num, _) in &data {
773 assert!(cursor
775 .row_by_number_with_cols(*row_num, BLOCKS_EMPTY_MASK)
776 .unwrap()
777 .unwrap()
778 .is_empty());
779 }
780 }
781 }
782 }
783
784 #[test]
785 fn test_writer() {
786 let (col1, col2) = test_data(None);
787 let num_columns = 2;
788 let file_path = tempfile::NamedTempFile::new().unwrap();
789
790 append_two_rows(num_columns, file_path.path(), &col1, &col2);
791
792 prune_rows(num_columns, file_path.path(), &col1, &col2);
795
796 append_two_rows(num_columns, file_path.path(), &col1, &col2);
798
799 test_append_consistency_no_commit(file_path.path(), &col1, &col2);
802
803 test_append_consistency_partial_commit(file_path.path(), &col1, &col2);
805 }
806
807 #[test]
808 fn test_pruner() {
809 let (col1, col2) = test_data(None);
810 let num_columns = 2;
811 let num_rows = 2;
812
813 let missing_offsets_scenarios = [(1, 1), (2, 1), (3, 0)];
816
817 for (missing_offsets, expected_rows) in missing_offsets_scenarios {
818 let file_path = tempfile::NamedTempFile::new().unwrap();
819
820 append_two_rows(num_columns, file_path.path(), &col1, &col2);
821
822 simulate_interrupted_prune(num_columns, file_path.path(), num_rows, missing_offsets);
823
824 let nippy = NippyJar::load_without_header(file_path.path()).unwrap();
825 assert_eq!(nippy.rows, expected_rows);
826 }
827 }
828
829 fn test_append_consistency_partial_commit(
830 file_path: &Path,
831 col1: &[Vec<u8>],
832 col2: &[Vec<u8>],
833 ) {
834 let nippy = NippyJar::load_without_header(file_path).unwrap();
835
836 let initial_rows = nippy.rows;
838 let initial_data_size =
839 File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
840 let initial_offset_size =
841 File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize;
842 assert!(initial_data_size > 0);
843 assert!(initial_offset_size > 0);
844
845 let mut writer = NippyJarWriter::new(nippy).unwrap();
847 writer.append_column(Some(Ok(&col1[2]))).unwrap();
848 writer.append_column(Some(Ok(&col2[2]))).unwrap();
849
850 let _ = writer.offsets_mut().pop();
852
853 writer.commit_offsets().unwrap();
856
857 drop(writer);
859
860 let nippy = NippyJar::load_without_header(file_path).unwrap();
861 assert_eq!(initial_rows, nippy.rows);
862
863 let new_data_size =
865 File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
866 assert_eq!(new_data_size, initial_data_size + col1[2].len() + col2[2].len());
867
868 assert_eq!(
870 initial_offset_size + 8,
871 File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize
872 );
873
874 let writer = NippyJarWriter::new(nippy).unwrap();
878 assert_eq!(initial_rows, writer.rows());
879 assert_eq!(
880 initial_offset_size,
881 File::open(writer.offsets_path()).unwrap().metadata().unwrap().len() as usize
882 );
883 assert_eq!(
884 initial_data_size,
885 File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize
886 );
887 }
888
889 fn test_append_consistency_no_commit(file_path: &Path, col1: &[Vec<u8>], col2: &[Vec<u8>]) {
890 let nippy = NippyJar::load_without_header(file_path).unwrap();
891
892 let initial_rows = nippy.rows;
894 let initial_data_size =
895 File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
896 let initial_offset_size =
897 File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize;
898 assert!(initial_data_size > 0);
899 assert!(initial_offset_size > 0);
900
901 let mut writer = NippyJarWriter::new(nippy).unwrap();
904 writer.append_column(Some(Ok(&col1[2]))).unwrap();
905 writer.append_column(Some(Ok(&col2[2]))).unwrap();
906
907 drop(writer);
909
910 let nippy = NippyJar::load_without_header(file_path).unwrap();
911 assert_eq!(initial_rows, nippy.rows);
912
913 let new_data_size =
915 File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
916 assert_eq!(new_data_size, initial_data_size + col1[2].len() + col2[2].len());
917
918 assert_eq!(
920 initial_offset_size,
921 File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize
922 );
923
924 let writer = NippyJarWriter::new(nippy).unwrap();
927 assert_eq!(initial_rows, writer.rows());
928 assert_eq!(
929 initial_data_size,
930 File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize
931 );
932 }
933
934 fn append_two_rows(num_columns: usize, file_path: &Path, col1: &[Vec<u8>], col2: &[Vec<u8>]) {
935 {
937 let nippy = NippyJar::new_without_header(num_columns, file_path);
938 nippy.freeze_config().unwrap();
939 assert_eq!(nippy.max_row_size, 0);
940 assert_eq!(nippy.rows, 0);
941
942 let mut writer = NippyJarWriter::new(nippy).unwrap();
943 assert_eq!(writer.column(), 0);
944
945 writer.append_column(Some(Ok(&col1[0]))).unwrap();
946 assert_eq!(writer.column(), 1);
947 assert!(writer.is_dirty());
948
949 writer.append_column(Some(Ok(&col2[0]))).unwrap();
950 assert!(writer.is_dirty());
951
952 assert_eq!(writer.column(), 0);
954
955 assert_eq!(writer.offsets().len(), 3);
957 let expected_data_file_size = *writer.offsets().last().unwrap();
958 writer.commit().unwrap();
959 assert!(!writer.is_dirty());
960
961 assert_eq!(writer.max_row_size(), col1[0].len() + col2[0].len());
962 assert_eq!(writer.rows(), 1);
963 assert_eq!(
964 File::open(writer.offsets_path()).unwrap().metadata().unwrap().len(),
965 1 + num_columns as u64 * 8 + 8
966 );
967 assert_eq!(
968 File::open(writer.data_path()).unwrap().metadata().unwrap().len(),
969 expected_data_file_size
970 );
971 }
972
973 {
975 let nippy = NippyJar::load_without_header(file_path).unwrap();
976 assert_eq!(nippy.max_row_size, col1[0].len() + col2[0].len());
978 assert_eq!(nippy.rows, 1);
979
980 let mut writer = NippyJarWriter::new(nippy).unwrap();
981 assert_eq!(writer.column(), 0);
982
983 writer.append_column(Some(Ok(&col1[1]))).unwrap();
984 assert_eq!(writer.column(), 1);
985
986 writer.append_column(Some(Ok(&col2[1]))).unwrap();
987
988 assert_eq!(writer.column(), 0);
990
991 assert_eq!(writer.offsets().len(), 3);
993 let expected_data_file_size = *writer.offsets().last().unwrap();
994 writer.commit().unwrap();
995
996 assert_eq!(writer.max_row_size(), col1[0].len() + col2[0].len());
997 assert_eq!(writer.rows(), 2);
998 assert_eq!(
999 File::open(writer.offsets_path()).unwrap().metadata().unwrap().len(),
1000 1 + writer.rows() as u64 * num_columns as u64 * 8 + 8
1001 );
1002 assert_eq!(
1003 File::open(writer.data_path()).unwrap().metadata().unwrap().len(),
1004 expected_data_file_size
1005 );
1006 }
1007 }
1008
1009 fn prune_rows(num_columns: usize, file_path: &Path, col1: &[Vec<u8>], col2: &[Vec<u8>]) {
1010 let nippy = NippyJar::load_without_header(file_path).unwrap();
1011 let mut writer = NippyJarWriter::new(nippy).unwrap();
1012
1013 writer.append_column(Some(Ok(&col1[2]))).unwrap();
1015 writer.append_column(Some(Ok(&col2[2]))).unwrap();
1016 assert!(writer.is_dirty());
1017
1018 writer.prune_rows(2).unwrap();
1020 assert_eq!(writer.rows(), 1);
1021
1022 assert_eq!(
1023 File::open(writer.offsets_path()).unwrap().metadata().unwrap().len(),
1024 1 + writer.rows() as u64 * num_columns as u64 * 8 + 8
1025 );
1026
1027 let expected_data_size = col1[0].len() + col2[0].len();
1028 assert_eq!(
1029 File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize,
1030 expected_data_size
1031 );
1032
1033 let nippy = NippyJar::load_without_header(file_path).unwrap();
1034 {
1035 let data_reader = nippy.open_data_reader().unwrap();
1036 assert_eq!(data_reader.offset(2).unwrap(), expected_data_size as u64);
1039 }
1040
1041 let mut writer = NippyJarWriter::new(nippy).unwrap();
1043 writer.prune_rows(1).unwrap();
1044 assert!(writer.is_dirty());
1045
1046 assert_eq!(writer.rows(), 0);
1047 assert_eq!(writer.max_row_size(), 0);
1048 assert_eq!(File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize, 0);
1049 assert_eq!(
1051 File::open(writer.offsets_path()).unwrap().metadata().unwrap().len() as usize,
1052 9
1053 );
1054 writer.commit().unwrap();
1055 assert!(!writer.is_dirty());
1056 }
1057
1058 fn simulate_interrupted_prune(
1059 num_columns: usize,
1060 file_path: &Path,
1061 num_rows: u64,
1062 missing_offsets: u64,
1063 ) {
1064 let nippy = NippyJar::load_without_header(file_path).unwrap();
1065 let reader = nippy.open_data_reader().unwrap();
1066 let offsets_file =
1067 OpenOptions::new().read(true).write(true).open(nippy.offsets_path()).unwrap();
1068 let offsets_len = 1 + num_rows * num_columns as u64 * 8 + 8;
1069 assert_eq!(offsets_len, offsets_file.metadata().unwrap().len());
1070
1071 let data_file = OpenOptions::new().read(true).write(true).open(nippy.data_path()).unwrap();
1072 let data_len = reader.reverse_offset(0).unwrap();
1073 assert_eq!(data_len, data_file.metadata().unwrap().len());
1074
1075 data_file.set_len(data_len - 32 * missing_offsets).unwrap();
1081
1082 let _ = NippyJarWriter::new(nippy).unwrap();
1084 }
1085}