#![doc(
html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
)]
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
use memmap2::Mmap;
use serde::{Deserialize, Serialize};
use std::{
error::Error as StdError,
fs::File,
io::Read,
ops::Range,
path::{Path, PathBuf},
};
use tracing::*;
pub mod compression;
#[cfg(test)]
use compression::Compression;
use compression::Compressors;
#[derive(Debug, Serialize, Deserialize)]
#[cfg_attr(test, derive(PartialEq, Eq))]
pub enum Functions {}
#[derive(Debug, Serialize, Deserialize)]
#[cfg_attr(test, derive(PartialEq, Eq))]
pub enum InclusionFilters {}
mod error;
pub use error::NippyJarError;
mod cursor;
pub use cursor::NippyJarCursor;
mod writer;
pub use writer::NippyJarWriter;
mod consistency;
pub use consistency::NippyJarChecker;
const NIPPY_JAR_VERSION: usize = 1;
const INDEX_FILE_EXTENSION: &str = "idx";
const OFFSETS_FILE_EXTENSION: &str = "off";
pub const CONFIG_FILE_EXTENSION: &str = "conf";
type RefRow<'a> = Vec<&'a [u8]>;
pub type ColumnResult<T> = Result<T, Box<dyn StdError + Send + Sync>>;
pub trait NippyJarHeader:
Send + Sync + Serialize + for<'b> Deserialize<'b> + std::fmt::Debug + 'static
{
}
impl<T> NippyJarHeader for T where
T: Send + Sync + Serialize + for<'b> Deserialize<'b> + std::fmt::Debug + 'static
{
}
#[derive(Serialize, Deserialize)]
#[cfg_attr(test, derive(PartialEq))]
pub struct NippyJar<H = ()> {
version: usize,
user_header: H,
columns: usize,
rows: usize,
compressor: Option<Compressors>,
#[serde(skip)]
filter: Option<InclusionFilters>,
#[serde(skip)]
phf: Option<Functions>,
max_row_size: usize,
#[serde(skip)]
path: PathBuf,
}
impl<H: NippyJarHeader> std::fmt::Debug for NippyJar<H> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NippyJar")
.field("version", &self.version)
.field("user_header", &self.user_header)
.field("rows", &self.rows)
.field("columns", &self.columns)
.field("compressor", &self.compressor)
.field("filter", &self.filter)
.field("phf", &self.phf)
.field("path", &self.path)
.field("max_row_size", &self.max_row_size)
.finish_non_exhaustive()
}
}
impl NippyJar<()> {
pub fn new_without_header(columns: usize, path: &Path) -> Self {
Self::new(columns, path, ())
}
pub fn load_without_header(path: &Path) -> Result<Self, NippyJarError> {
Self::load(path)
}
}
impl<H: NippyJarHeader> NippyJar<H> {
pub fn new(columns: usize, path: &Path, user_header: H) -> Self {
Self {
version: NIPPY_JAR_VERSION,
user_header,
columns,
rows: 0,
max_row_size: 0,
compressor: None,
filter: None,
phf: None,
path: path.to_path_buf(),
}
}
pub fn with_zstd(mut self, use_dict: bool, max_dict_size: usize) -> Self {
self.compressor =
Some(Compressors::Zstd(compression::Zstd::new(use_dict, max_dict_size, self.columns)));
self
}
pub fn with_lz4(mut self) -> Self {
self.compressor = Some(Compressors::Lz4(compression::Lz4::default()));
self
}
pub const fn user_header(&self) -> &H {
&self.user_header
}
pub const fn columns(&self) -> usize {
self.columns
}
pub const fn rows(&self) -> usize {
self.rows
}
pub const fn compressor(&self) -> Option<&Compressors> {
self.compressor.as_ref()
}
pub fn compressor_mut(&mut self) -> Option<&mut Compressors> {
self.compressor.as_mut()
}
pub fn load(path: &Path) -> Result<Self, NippyJarError> {
let config_path = path.with_extension(CONFIG_FILE_EXTENSION);
let config_file = File::open(&config_path)
.map_err(|err| reth_fs_util::FsPathError::open(err, config_path))?;
let mut obj = Self::load_from_reader(config_file)?;
obj.path = path.to_path_buf();
Ok(obj)
}
pub fn load_from_reader<R: Read>(reader: R) -> Result<Self, NippyJarError> {
Ok(bincode::deserialize_from(reader)?)
}
pub fn data_path(&self) -> &Path {
self.path.as_ref()
}
pub fn index_path(&self) -> PathBuf {
self.path.with_extension(INDEX_FILE_EXTENSION)
}
pub fn offsets_path(&self) -> PathBuf {
self.path.with_extension(OFFSETS_FILE_EXTENSION)
}
pub fn config_path(&self) -> PathBuf {
self.path.with_extension(CONFIG_FILE_EXTENSION)
}
pub fn delete(self) -> Result<(), NippyJarError> {
for path in
[self.data_path().into(), self.index_path(), self.offsets_path(), self.config_path()]
{
if path.exists() {
reth_fs_util::remove_file(path)?;
}
}
Ok(())
}
pub fn open_data_reader(&self) -> Result<DataReader, NippyJarError> {
DataReader::new(self.data_path())
}
fn freeze_config(&self) -> Result<(), NippyJarError> {
Ok(reth_fs_util::atomic_write_file(&self.config_path(), |file| {
bincode::serialize_into(file, &self)
})?)
}
}
#[cfg(test)]
impl<H: NippyJarHeader> NippyJar<H> {
pub fn prepare_compression(
&mut self,
columns: Vec<impl IntoIterator<Item = Vec<u8>>>,
) -> Result<(), NippyJarError> {
if let Some(compression) = &mut self.compressor {
debug!(target: "nippy-jar", columns=columns.len(), "Preparing compression.");
compression.prepare_compression(columns)?;
}
Ok(())
}
pub fn freeze(
self,
columns: Vec<impl IntoIterator<Item = ColumnResult<Vec<u8>>>>,
total_rows: u64,
) -> Result<Self, NippyJarError> {
self.check_before_freeze(&columns)?;
debug!(target: "nippy-jar", path=?self.data_path(), "Opening data file.");
let mut writer = NippyJarWriter::new(self)?;
writer.append_rows(columns, total_rows)?;
writer.commit()?;
debug!(target: "nippy-jar", ?writer, "Finished writing data.");
Ok(writer.into_jar())
}
fn check_before_freeze(
&self,
columns: &[impl IntoIterator<Item = ColumnResult<Vec<u8>>>],
) -> Result<(), NippyJarError> {
if columns.len() != self.columns {
return Err(NippyJarError::ColumnLenMismatch(self.columns, columns.len()))
}
if let Some(compression) = &self.compressor {
if !compression.is_ready() {
return Err(NippyJarError::CompressorNotReady)
}
}
Ok(())
}
}
#[derive(Debug)]
pub struct DataReader {
#[allow(dead_code)]
data_file: File,
data_mmap: Mmap,
#[allow(dead_code)]
offset_file: File,
offset_mmap: Mmap,
offset_size: u8,
}
impl DataReader {
pub fn new(path: impl AsRef<Path>) -> Result<Self, NippyJarError> {
let data_file = File::open(path.as_ref())?;
let data_mmap = unsafe { Mmap::map(&data_file)? };
let offset_file = File::open(path.as_ref().with_extension(OFFSETS_FILE_EXTENSION))?;
let offset_mmap = unsafe { Mmap::map(&offset_file)? };
let offset_size = offset_mmap[0];
if offset_size > 8 {
return Err(NippyJarError::OffsetSizeTooBig { offset_size })
} else if offset_size == 0 {
return Err(NippyJarError::OffsetSizeTooSmall { offset_size })
}
Ok(Self { data_file, data_mmap, offset_file, offset_size, offset_mmap })
}
pub fn offset(&self, index: usize) -> Result<u64, NippyJarError> {
let from = index * self.offset_size as usize + 1;
self.offset_at(from)
}
pub fn reverse_offset(&self, index: usize) -> Result<u64, NippyJarError> {
let offsets_file_size = self.offset_file.metadata()?.len() as usize;
if offsets_file_size > 1 {
let from = offsets_file_size - self.offset_size as usize * (index + 1);
self.offset_at(from)
} else {
Ok(0)
}
}
pub fn offsets_count(&self) -> Result<usize, NippyJarError> {
Ok((self.offset_file.metadata()?.len().saturating_sub(1) / self.offset_size as u64)
as usize)
}
fn offset_at(&self, index: usize) -> Result<u64, NippyJarError> {
let mut buffer: [u8; 8] = [0; 8];
let offset_end = index.saturating_add(self.offset_size as usize);
if offset_end > self.offset_mmap.len() {
return Err(NippyJarError::OffsetOutOfBounds { index })
}
buffer[..self.offset_size as usize].copy_from_slice(&self.offset_mmap[index..offset_end]);
Ok(u64::from_le_bytes(buffer))
}
pub const fn offset_size(&self) -> u8 {
self.offset_size
}
pub fn data(&self, range: Range<usize>) -> &[u8] {
&self.data_mmap[range]
}
pub fn size(&self) -> usize {
self.data_mmap.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
use compression::Compression;
use rand::{rngs::SmallRng, seq::SliceRandom, RngCore, SeedableRng};
use std::{fs::OpenOptions, io::Read};
type ColumnResults<T> = Vec<ColumnResult<T>>;
type ColumnValues = Vec<Vec<u8>>;
fn test_data(seed: Option<u64>) -> (ColumnValues, ColumnValues) {
let value_length = 32;
let num_rows = 100;
let mut vec: Vec<u8> = vec![0; value_length];
let mut rng = seed.map(SmallRng::seed_from_u64).unwrap_or_else(SmallRng::from_entropy);
let mut gen = || {
(0..num_rows)
.map(|_| {
rng.fill_bytes(&mut vec[..]);
vec.clone()
})
.collect()
};
(gen(), gen())
}
fn clone_with_result(col: &ColumnValues) -> ColumnResults<Vec<u8>> {
col.iter().map(|v| Ok(v.clone())).collect()
}
#[test]
fn test_config_serialization() {
let file = tempfile::NamedTempFile::new().unwrap();
let jar = NippyJar::new_without_header(23, file.path()).with_lz4();
jar.freeze_config().unwrap();
let mut config_file = OpenOptions::new().read(true).open(jar.config_path()).unwrap();
let config_file_len = config_file.metadata().unwrap().len();
assert_eq!(config_file_len, 37);
let mut buf = Vec::with_capacity(config_file_len as usize);
config_file.read_to_end(&mut buf).unwrap();
assert_eq!(
vec![
1, 0, 0, 0, 0, 0, 0, 0, 23, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0
],
buf
);
let mut read_jar = bincode::deserialize_from::<_, NippyJar>(&buf[..]).unwrap();
read_jar.path = file.path().to_path_buf();
assert_eq!(jar, read_jar);
}
#[test]
fn test_zstd_with_dictionaries() {
let (col1, col2) = test_data(None);
let num_rows = col1.len() as u64;
let num_columns = 2;
let file_path = tempfile::NamedTempFile::new().unwrap();
let nippy = NippyJar::new_without_header(num_columns, file_path.path());
assert!(nippy.compressor().is_none());
let mut nippy =
NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(true, 5000);
assert!(nippy.compressor().is_some());
if let Some(Compressors::Zstd(zstd)) = &mut nippy.compressor_mut() {
assert!(matches!(zstd.compressors(), Err(NippyJarError::CompressorNotReady)));
assert!(matches!(
zstd.prepare_compression(vec![col1.clone(), col2.clone(), col2.clone()]),
Err(NippyJarError::ColumnLenMismatch(columns, 3)) if columns == num_columns
));
}
assert!(matches!(
nippy.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows),
Err(NippyJarError::CompressorNotReady)
));
let mut nippy =
NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(true, 5000);
assert!(nippy.compressor().is_some());
nippy.prepare_compression(vec![col1.clone(), col2.clone()]).unwrap();
if let Some(Compressors::Zstd(zstd)) = &nippy.compressor() {
assert!(matches!(
(&zstd.state, zstd.dictionaries.as_ref().map(|dict| dict.len())),
(compression::ZstdState::Ready, Some(columns)) if columns == num_columns
));
}
let nippy = nippy
.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
.unwrap();
let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
assert_eq!(nippy.version, loaded_nippy.version);
assert_eq!(nippy.columns, loaded_nippy.columns);
assert_eq!(nippy.filter, loaded_nippy.filter);
assert_eq!(nippy.phf, loaded_nippy.phf);
assert_eq!(nippy.max_row_size, loaded_nippy.max_row_size);
assert_eq!(nippy.path, loaded_nippy.path);
if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() {
assert!(zstd.use_dict);
let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
let mut row_index = 0usize;
while let Some(row) = cursor.next_row().unwrap() {
assert_eq!(
(row[0], row[1]),
(col1[row_index].as_slice(), col2[row_index].as_slice())
);
row_index += 1;
}
} else {
panic!("Expected Zstd compressor")
}
}
#[test]
fn test_lz4() {
let (col1, col2) = test_data(None);
let num_rows = col1.len() as u64;
let num_columns = 2;
let file_path = tempfile::NamedTempFile::new().unwrap();
let nippy = NippyJar::new_without_header(num_columns, file_path.path());
assert!(nippy.compressor().is_none());
let nippy = NippyJar::new_without_header(num_columns, file_path.path()).with_lz4();
assert!(nippy.compressor().is_some());
let nippy = nippy
.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
.unwrap();
let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
assert_eq!(nippy, loaded_nippy);
if let Some(Compressors::Lz4(_)) = loaded_nippy.compressor() {
let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
let mut row_index = 0usize;
while let Some(row) = cursor.next_row().unwrap() {
assert_eq!(
(row[0], row[1]),
(col1[row_index].as_slice(), col2[row_index].as_slice())
);
row_index += 1;
}
} else {
panic!("Expected Lz4 compressor")
}
}
#[test]
fn test_zstd_no_dictionaries() {
let (col1, col2) = test_data(None);
let num_rows = col1.len() as u64;
let num_columns = 2;
let file_path = tempfile::NamedTempFile::new().unwrap();
let nippy = NippyJar::new_without_header(num_columns, file_path.path());
assert!(nippy.compressor().is_none());
let nippy =
NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(false, 5000);
assert!(nippy.compressor().is_some());
let nippy = nippy
.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
.unwrap();
let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
assert_eq!(nippy, loaded_nippy);
if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() {
assert!(!zstd.use_dict);
let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
let mut row_index = 0usize;
while let Some(row) = cursor.next_row().unwrap() {
assert_eq!(
(row[0], row[1]),
(col1[row_index].as_slice(), col2[row_index].as_slice())
);
row_index += 1;
}
} else {
panic!("Expected Zstd compressor")
}
}
#[test]
fn test_full_nippy_jar() {
let (col1, col2) = test_data(None);
let num_rows = col1.len() as u64;
let num_columns = 2;
let file_path = tempfile::NamedTempFile::new().unwrap();
let data = vec![col1.clone(), col2.clone()];
let block_start = 500;
#[derive(Serialize, Deserialize, Debug)]
struct BlockJarHeader {
block_start: usize,
}
{
let mut nippy =
NippyJar::new(num_columns, file_path.path(), BlockJarHeader { block_start })
.with_zstd(true, 5000);
nippy.prepare_compression(data.clone()).unwrap();
nippy
.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
.unwrap();
}
{
let loaded_nippy = NippyJar::<BlockJarHeader>::load(file_path.path()).unwrap();
assert!(loaded_nippy.compressor().is_some());
assert_eq!(loaded_nippy.user_header().block_start, block_start);
if let Some(Compressors::Zstd(_zstd)) = loaded_nippy.compressor() {
let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
let mut row_num = 0usize;
while let Some(row) = cursor.next_row().unwrap() {
assert_eq!(
(row[0], row[1]),
(data[0][row_num].as_slice(), data[1][row_num].as_slice())
);
row_num += 1;
}
let mut data = col1.iter().zip(col2.iter()).enumerate().collect::<Vec<_>>();
data.shuffle(&mut rand::thread_rng());
for (row_num, (v0, v1)) in data {
let row_by_num = cursor.row_by_number(row_num).unwrap().unwrap();
assert_eq!((&row_by_num[0].to_vec(), &row_by_num[1].to_vec()), (v0, v1));
}
}
}
}
#[test]
fn test_selectable_column_values() {
let (col1, col2) = test_data(None);
let num_rows = col1.len() as u64;
let num_columns = 2;
let file_path = tempfile::NamedTempFile::new().unwrap();
let data = vec![col1.clone(), col2.clone()];
{
let mut nippy =
NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(true, 5000);
nippy.prepare_compression(data).unwrap();
nippy
.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
.unwrap();
}
{
let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
if let Some(Compressors::Zstd(_zstd)) = loaded_nippy.compressor() {
let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
let mut data = col1.iter().zip(col2.iter()).enumerate().collect::<Vec<_>>();
data.shuffle(&mut rand::thread_rng());
const BLOCKS_FULL_MASK: usize = 0b11;
for (row_num, (v0, v1)) in &data {
let row_by_num = cursor
.row_by_number_with_cols(*row_num, BLOCKS_FULL_MASK)
.unwrap()
.unwrap();
assert_eq!((&row_by_num[0].to_vec(), &row_by_num[1].to_vec()), (*v0, *v1));
}
const BLOCKS_BLOCK_MASK: usize = 0b01;
for (row_num, (v0, _)) in &data {
let row_by_num = cursor
.row_by_number_with_cols(*row_num, BLOCKS_BLOCK_MASK)
.unwrap()
.unwrap();
assert_eq!(row_by_num.len(), 1);
assert_eq!(&row_by_num[0].to_vec(), *v0);
}
const BLOCKS_WITHDRAWAL_MASK: usize = 0b10;
for (row_num, (_, v1)) in &data {
let row_by_num = cursor
.row_by_number_with_cols(*row_num, BLOCKS_WITHDRAWAL_MASK)
.unwrap()
.unwrap();
assert_eq!(row_by_num.len(), 1);
assert_eq!(&row_by_num[0].to_vec(), *v1);
}
const BLOCKS_EMPTY_MASK: usize = 0b00;
for (row_num, _) in &data {
assert!(cursor
.row_by_number_with_cols(*row_num, BLOCKS_EMPTY_MASK)
.unwrap()
.unwrap()
.is_empty());
}
}
}
}
#[test]
fn test_writer() {
let (col1, col2) = test_data(None);
let num_columns = 2;
let file_path = tempfile::NamedTempFile::new().unwrap();
append_two_rows(num_columns, file_path.path(), &col1, &col2);
prune_rows(num_columns, file_path.path(), &col1, &col2);
append_two_rows(num_columns, file_path.path(), &col1, &col2);
test_append_consistency_no_commit(file_path.path(), &col1, &col2);
test_append_consistency_partial_commit(file_path.path(), &col1, &col2);
}
#[test]
fn test_pruner() {
let (col1, col2) = test_data(None);
let num_columns = 2;
let num_rows = 2;
let missing_offsets_scenarios = [(1, 1), (2, 1), (3, 0)];
for (missing_offsets, expected_rows) in missing_offsets_scenarios {
let file_path = tempfile::NamedTempFile::new().unwrap();
append_two_rows(num_columns, file_path.path(), &col1, &col2);
simulate_interrupted_prune(num_columns, file_path.path(), num_rows, missing_offsets);
let nippy = NippyJar::load_without_header(file_path.path()).unwrap();
assert_eq!(nippy.rows, expected_rows);
}
}
fn test_append_consistency_partial_commit(
file_path: &Path,
col1: &[Vec<u8>],
col2: &[Vec<u8>],
) {
let nippy = NippyJar::load_without_header(file_path).unwrap();
let initial_rows = nippy.rows;
let initial_data_size =
File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
let initial_offset_size =
File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize;
assert!(initial_data_size > 0);
assert!(initial_offset_size > 0);
let mut writer = NippyJarWriter::new(nippy).unwrap();
writer.append_column(Some(Ok(&col1[2]))).unwrap();
writer.append_column(Some(Ok(&col2[2]))).unwrap();
let _ = writer.offsets_mut().pop();
writer.commit_offsets().unwrap();
drop(writer);
let nippy = NippyJar::load_without_header(file_path).unwrap();
assert_eq!(initial_rows, nippy.rows);
let new_data_size =
File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
assert_eq!(new_data_size, initial_data_size + col1[2].len() + col2[2].len());
assert_eq!(
initial_offset_size + 8,
File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize
);
let writer = NippyJarWriter::new(nippy).unwrap();
assert_eq!(initial_rows, writer.rows());
assert_eq!(
initial_offset_size,
File::open(writer.offsets_path()).unwrap().metadata().unwrap().len() as usize
);
assert_eq!(
initial_data_size,
File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize
);
}
fn test_append_consistency_no_commit(file_path: &Path, col1: &[Vec<u8>], col2: &[Vec<u8>]) {
let nippy = NippyJar::load_without_header(file_path).unwrap();
let initial_rows = nippy.rows;
let initial_data_size =
File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
let initial_offset_size =
File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize;
assert!(initial_data_size > 0);
assert!(initial_offset_size > 0);
let mut writer = NippyJarWriter::new(nippy).unwrap();
writer.append_column(Some(Ok(&col1[2]))).unwrap();
writer.append_column(Some(Ok(&col2[2]))).unwrap();
drop(writer);
let nippy = NippyJar::load_without_header(file_path).unwrap();
assert_eq!(initial_rows, nippy.rows);
let new_data_size =
File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
assert_eq!(new_data_size, initial_data_size + col1[2].len() + col2[2].len());
assert_eq!(
initial_offset_size,
File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize
);
let writer = NippyJarWriter::new(nippy).unwrap();
assert_eq!(initial_rows, writer.rows());
assert_eq!(
initial_data_size,
File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize
);
}
fn append_two_rows(num_columns: usize, file_path: &Path, col1: &[Vec<u8>], col2: &[Vec<u8>]) {
{
let nippy = NippyJar::new_without_header(num_columns, file_path);
nippy.freeze_config().unwrap();
assert_eq!(nippy.max_row_size, 0);
assert_eq!(nippy.rows, 0);
let mut writer = NippyJarWriter::new(nippy).unwrap();
assert_eq!(writer.column(), 0);
writer.append_column(Some(Ok(&col1[0]))).unwrap();
assert_eq!(writer.column(), 1);
assert!(writer.is_dirty());
writer.append_column(Some(Ok(&col2[0]))).unwrap();
assert!(writer.is_dirty());
assert_eq!(writer.column(), 0);
assert_eq!(writer.offsets().len(), 3);
let expected_data_file_size = *writer.offsets().last().unwrap();
writer.commit().unwrap();
assert!(!writer.is_dirty());
assert_eq!(writer.max_row_size(), col1[0].len() + col2[0].len());
assert_eq!(writer.rows(), 1);
assert_eq!(
File::open(writer.offsets_path()).unwrap().metadata().unwrap().len(),
1 + num_columns as u64 * 8 + 8
);
assert_eq!(
File::open(writer.data_path()).unwrap().metadata().unwrap().len(),
expected_data_file_size
);
}
{
let nippy = NippyJar::load_without_header(file_path).unwrap();
assert_eq!(nippy.max_row_size, col1[0].len() + col2[0].len());
assert_eq!(nippy.rows, 1);
let mut writer = NippyJarWriter::new(nippy).unwrap();
assert_eq!(writer.column(), 0);
writer.append_column(Some(Ok(&col1[1]))).unwrap();
assert_eq!(writer.column(), 1);
writer.append_column(Some(Ok(&col2[1]))).unwrap();
assert_eq!(writer.column(), 0);
assert_eq!(writer.offsets().len(), 3);
let expected_data_file_size = *writer.offsets().last().unwrap();
writer.commit().unwrap();
assert_eq!(writer.max_row_size(), col1[0].len() + col2[0].len());
assert_eq!(writer.rows(), 2);
assert_eq!(
File::open(writer.offsets_path()).unwrap().metadata().unwrap().len(),
1 + writer.rows() as u64 * num_columns as u64 * 8 + 8
);
assert_eq!(
File::open(writer.data_path()).unwrap().metadata().unwrap().len(),
expected_data_file_size
);
}
}
fn prune_rows(num_columns: usize, file_path: &Path, col1: &[Vec<u8>], col2: &[Vec<u8>]) {
let nippy = NippyJar::load_without_header(file_path).unwrap();
let mut writer = NippyJarWriter::new(nippy).unwrap();
writer.append_column(Some(Ok(&col1[2]))).unwrap();
writer.append_column(Some(Ok(&col2[2]))).unwrap();
assert!(writer.is_dirty());
writer.prune_rows(2).unwrap();
assert_eq!(writer.rows(), 1);
assert_eq!(
File::open(writer.offsets_path()).unwrap().metadata().unwrap().len(),
1 + writer.rows() as u64 * num_columns as u64 * 8 + 8
);
let expected_data_size = col1[0].len() + col2[0].len();
assert_eq!(
File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize,
expected_data_size
);
let nippy = NippyJar::load_without_header(file_path).unwrap();
{
let data_reader = nippy.open_data_reader().unwrap();
assert_eq!(data_reader.offset(2).unwrap(), expected_data_size as u64);
}
let mut writer = NippyJarWriter::new(nippy).unwrap();
writer.prune_rows(1).unwrap();
assert!(writer.is_dirty());
assert_eq!(writer.rows(), 0);
assert_eq!(writer.max_row_size(), 0);
assert_eq!(File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize, 0);
assert_eq!(
File::open(writer.offsets_path()).unwrap().metadata().unwrap().len() as usize,
1
);
writer.commit().unwrap();
assert!(!writer.is_dirty());
}
fn simulate_interrupted_prune(
num_columns: usize,
file_path: &Path,
num_rows: u64,
missing_offsets: u64,
) {
let nippy = NippyJar::load_without_header(file_path).unwrap();
let reader = nippy.open_data_reader().unwrap();
let offsets_file =
OpenOptions::new().read(true).write(true).open(nippy.offsets_path()).unwrap();
let offsets_len = 1 + num_rows * num_columns as u64 * 8 + 8;
assert_eq!(offsets_len, offsets_file.metadata().unwrap().len());
let data_file = OpenOptions::new().read(true).write(true).open(nippy.data_path()).unwrap();
let data_len = reader.reverse_offset(0).unwrap();
assert_eq!(data_len, data_file.metadata().unwrap().len());
data_file.set_len(data_len - 32 * missing_offsets).unwrap();
let _ = NippyJarWriter::new(nippy).unwrap();
}
}