#![doc(
html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
)]
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
#![allow(missing_docs)]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
use memmap2::Mmap;
use serde::{Deserialize, Serialize};
use std::{
error::Error as StdError,
fs::{File, OpenOptions},
ops::Range,
path::{Path, PathBuf},
};
use sucds::{int_vectors::PrefixSummedEliasFano, Serializable};
use tracing::*;
pub mod filter;
use filter::{Cuckoo, InclusionFilter, InclusionFilters};
pub mod compression;
#[cfg(test)]
use compression::Compression;
use compression::Compressors;
pub mod phf;
pub use phf::PHFKey;
use phf::{Fmph, Functions, GoFmph, PerfectHashingFunction};
mod error;
pub use error::NippyJarError;
mod cursor;
pub use cursor::NippyJarCursor;
mod writer;
pub use writer::{ConsistencyFailStrategy, NippyJarWriter};
const NIPPY_JAR_VERSION: usize = 1;
const INDEX_FILE_EXTENSION: &str = "idx";
const OFFSETS_FILE_EXTENSION: &str = "off";
const CONFIG_FILE_EXTENSION: &str = "conf";
type RefRow<'a> = Vec<&'a [u8]>;
pub type ColumnResult<T> = Result<T, Box<dyn StdError + Send + Sync>>;
pub trait NippyJarHeader:
Send + Sync + Serialize + for<'b> Deserialize<'b> + std::fmt::Debug + 'static
{
}
impl<T> NippyJarHeader for T where
T: Send + Sync + Serialize + for<'b> Deserialize<'b> + std::fmt::Debug + 'static
{
}
#[derive(Serialize, Deserialize)]
#[cfg_attr(test, derive(PartialEq))]
pub struct NippyJar<H = ()> {
version: usize,
user_header: H,
columns: usize,
rows: usize,
compressor: Option<Compressors>,
#[serde(skip)]
filter: Option<InclusionFilters>,
#[serde(skip)]
phf: Option<Functions>,
#[serde(skip)]
offsets_index: PrefixSummedEliasFano,
max_row_size: usize,
#[serde(skip)]
path: PathBuf,
}
impl<H: NippyJarHeader> std::fmt::Debug for NippyJar<H> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NippyJar")
.field("version", &self.version)
.field("user_header", &self.user_header)
.field("rows", &self.rows)
.field("columns", &self.columns)
.field("compressor", &self.compressor)
.field("filter", &self.filter)
.field("phf", &self.phf)
.field("offsets_index (len)", &self.offsets_index.len())
.field("offsets_index (size in bytes)", &self.offsets_index.size_in_bytes())
.field("path", &self.path)
.field("max_row_size", &self.max_row_size)
.finish_non_exhaustive()
}
}
impl NippyJar<()> {
pub fn new_without_header(columns: usize, path: &Path) -> Self {
Self::new(columns, path, ())
}
pub fn load_without_header(path: &Path) -> Result<Self, NippyJarError> {
Self::load(path)
}
pub const fn uses_filters(&self) -> bool {
self.filter.is_some() && self.phf.is_some()
}
}
impl<H: NippyJarHeader> NippyJar<H> {
pub fn new(columns: usize, path: &Path, user_header: H) -> Self {
Self {
version: NIPPY_JAR_VERSION,
user_header,
columns,
rows: 0,
max_row_size: 0,
compressor: None,
filter: None,
phf: None,
offsets_index: PrefixSummedEliasFano::default(),
path: path.to_path_buf(),
}
}
pub fn with_zstd(mut self, use_dict: bool, max_dict_size: usize) -> Self {
self.compressor =
Some(Compressors::Zstd(compression::Zstd::new(use_dict, max_dict_size, self.columns)));
self
}
pub fn with_lz4(mut self) -> Self {
self.compressor = Some(Compressors::Lz4(compression::Lz4::default()));
self
}
pub fn with_cuckoo_filter(mut self, max_capacity: usize) -> Self {
self.filter = Some(InclusionFilters::Cuckoo(Cuckoo::new(max_capacity)));
self
}
pub fn with_fmph(mut self) -> Self {
self.phf = Some(Functions::Fmph(Fmph::new()));
self
}
pub fn with_gofmph(mut self) -> Self {
self.phf = Some(Functions::GoFmph(GoFmph::new()));
self
}
pub const fn user_header(&self) -> &H {
&self.user_header
}
pub const fn columns(&self) -> usize {
self.columns
}
pub const fn rows(&self) -> usize {
self.rows
}
pub fn filter_size(&self) -> usize {
self.size()
}
pub fn offsets_index_size(&self) -> usize {
self.offsets_index.size_in_bytes()
}
pub const fn compressor(&self) -> Option<&Compressors> {
self.compressor.as_ref()
}
pub fn compressor_mut(&mut self) -> Option<&mut Compressors> {
self.compressor.as_mut()
}
pub fn load(path: &Path) -> Result<Self, NippyJarError> {
let config_path = path.with_extension(CONFIG_FILE_EXTENSION);
let config_file = File::open(&config_path)
.map_err(|err| reth_fs_util::FsPathError::open(err, config_path))?;
let mut obj: Self = bincode::deserialize_from(&config_file)?;
obj.path = path.to_path_buf();
Ok(obj)
}
pub fn load_filters(&mut self) -> Result<(), NippyJarError> {
let mut offsets_file = File::open(self.index_path())?;
self.offsets_index = PrefixSummedEliasFano::deserialize_from(&mut offsets_file)?;
self.phf = bincode::deserialize_from(&mut offsets_file)?;
self.filter = bincode::deserialize_from(&mut offsets_file)?;
Ok(())
}
pub fn data_path(&self) -> &Path {
self.path.as_ref()
}
pub fn index_path(&self) -> PathBuf {
self.path.with_extension(INDEX_FILE_EXTENSION)
}
pub fn offsets_path(&self) -> PathBuf {
self.path.with_extension(OFFSETS_FILE_EXTENSION)
}
pub fn config_path(&self) -> PathBuf {
self.path.with_extension(CONFIG_FILE_EXTENSION)
}
pub fn delete(self) -> Result<(), NippyJarError> {
for path in
[self.data_path().into(), self.index_path(), self.offsets_path(), self.config_path()]
{
if path.exists() {
reth_fs_util::remove_file(path)?;
}
}
Ok(())
}
pub fn open_data_reader(&self) -> Result<DataReader, NippyJarError> {
DataReader::new(self.data_path())
}
fn freeze_config(&self) -> Result<(), NippyJarError> {
let mut tmp_path = self.config_path();
tmp_path.set_extension(".tmp");
let mut file = File::create(&tmp_path)?;
bincode::serialize_into(&mut file, &self)?;
file.sync_all()?;
reth_fs_util::rename(&tmp_path, self.config_path())?;
if let Some(parent) = tmp_path.parent() {
OpenOptions::new().read(true).open(parent)?.sync_all()?;
}
Ok(())
}
}
impl<H: NippyJarHeader> InclusionFilter for NippyJar<H> {
fn add(&mut self, element: &[u8]) -> Result<(), NippyJarError> {
self.filter.as_mut().ok_or(NippyJarError::FilterMissing)?.add(element)
}
fn contains(&self, element: &[u8]) -> Result<bool, NippyJarError> {
self.filter.as_ref().ok_or(NippyJarError::FilterMissing)?.contains(element)
}
fn size(&self) -> usize {
self.filter.as_ref().map(|f| f.size()).unwrap_or(0)
}
}
impl<H: NippyJarHeader> PerfectHashingFunction for NippyJar<H> {
fn set_keys<T: PHFKey>(&mut self, keys: &[T]) -> Result<(), NippyJarError> {
self.phf.as_mut().ok_or(NippyJarError::PHFMissing)?.set_keys(keys)
}
fn get_index(&self, key: &[u8]) -> Result<Option<u64>, NippyJarError> {
self.phf.as_ref().ok_or(NippyJarError::PHFMissing)?.get_index(key)
}
}
#[cfg(test)]
impl<H: NippyJarHeader> NippyJar<H> {
pub fn prepare_compression(
&mut self,
columns: Vec<impl IntoIterator<Item = Vec<u8>>>,
) -> Result<(), NippyJarError> {
if let Some(compression) = &mut self.compressor {
debug!(target: "nippy-jar", columns=columns.len(), "Preparing compression.");
compression.prepare_compression(columns)?;
}
Ok(())
}
pub fn prepare_index<T: PHFKey>(
&mut self,
values: impl IntoIterator<Item = ColumnResult<T>>,
row_count: usize,
) -> Result<(), NippyJarError> {
debug!(target: "nippy-jar", ?row_count, "Preparing index.");
let values = values.into_iter().collect::<Result<Vec<_>, _>>()?;
debug_assert!(
row_count == values.len(),
"Row count ({row_count}) differs from value list count ({}).",
values.len()
);
let mut offsets_index = vec![0; row_count];
if let Some(phf) = self.phf.as_mut() {
debug!(target: "nippy-jar", ?row_count, values_count = ?values.len(), "Setting keys for perfect hashing function.");
phf.set_keys(&values)?;
}
if self.filter.is_some() || self.phf.is_some() {
debug!(target: "nippy-jar", ?row_count, "Creating filter and offsets_index.");
for (row_num, v) in values.into_iter().enumerate() {
if let Some(filter) = self.filter.as_mut() {
filter.add(v.as_ref())?;
}
if let Some(phf) = self.phf.as_mut() {
let index = phf.get_index(v.as_ref())?.expect("initialized") as usize;
let _ = std::mem::replace(&mut offsets_index[index], row_num as u64);
}
}
}
debug!(target: "nippy-jar", ?row_count, "Encoding offsets index list.");
self.offsets_index = PrefixSummedEliasFano::from_slice(&offsets_index)?;
Ok(())
}
pub fn freeze(
self,
columns: Vec<impl IntoIterator<Item = ColumnResult<Vec<u8>>>>,
total_rows: u64,
) -> Result<Self, NippyJarError> {
self.check_before_freeze(&columns)?;
debug!(target: "nippy-jar", path=?self.data_path(), "Opening data file.");
self.freeze_filters()?;
let mut writer = NippyJarWriter::new(self, ConsistencyFailStrategy::Heal)?;
writer.append_rows(columns, total_rows)?;
writer.commit()?;
debug!(target: "nippy-jar", ?writer, "Finished writing data.");
Ok(writer.into_jar())
}
fn freeze_filters(&self) -> Result<(), NippyJarError> {
debug!(target: "nippy-jar", path=?self.index_path(), "Writing offsets and offsets index to file.");
let mut file = File::create(self.index_path())?;
self.offsets_index.serialize_into(&mut file)?;
bincode::serialize_into(&mut file, &self.phf)?;
bincode::serialize_into(&mut file, &self.filter)?;
Ok(())
}
fn check_before_freeze(
&self,
columns: &[impl IntoIterator<Item = ColumnResult<Vec<u8>>>],
) -> Result<(), NippyJarError> {
if columns.len() != self.columns {
return Err(NippyJarError::ColumnLenMismatch(self.columns, columns.len()))
}
if let Some(compression) = &self.compressor {
if !compression.is_ready() {
return Err(NippyJarError::CompressorNotReady)
}
}
if let Some(phf) = &self.phf {
let _ = phf.get_index(&[])?;
}
Ok(())
}
}
#[derive(Debug)]
pub struct DataReader {
#[allow(dead_code)]
data_file: File,
data_mmap: Mmap,
#[allow(dead_code)]
offset_file: File,
offset_mmap: Mmap,
offset_size: u8,
}
impl DataReader {
pub fn new(path: impl AsRef<Path>) -> Result<Self, NippyJarError> {
let data_file = File::open(path.as_ref())?;
let data_mmap = unsafe { Mmap::map(&data_file)? };
let offset_file = File::open(path.as_ref().with_extension(OFFSETS_FILE_EXTENSION))?;
let offset_mmap = unsafe { Mmap::map(&offset_file)? };
let offset_size = offset_mmap[0];
if offset_size > 8 {
return Err(NippyJarError::OffsetSizeTooBig { offset_size })
} else if offset_size == 0 {
return Err(NippyJarError::OffsetSizeTooSmall { offset_size })
}
Ok(Self { data_file, data_mmap, offset_file, offset_size, offset_mmap })
}
pub fn offset(&self, index: usize) -> Result<u64, NippyJarError> {
let from = index * self.offset_size as usize + 1;
self.offset_at(from)
}
pub fn reverse_offset(&self, index: usize) -> Result<u64, NippyJarError> {
let offsets_file_size = self.offset_file.metadata()?.len() as usize;
if offsets_file_size > 1 {
let from = offsets_file_size - self.offset_size as usize * (index + 1);
self.offset_at(from)
} else {
Ok(0)
}
}
pub fn offsets_count(&self) -> Result<usize, NippyJarError> {
Ok((self.offset_file.metadata()?.len().saturating_sub(1) / self.offset_size as u64)
as usize)
}
fn offset_at(&self, index: usize) -> Result<u64, NippyJarError> {
let mut buffer: [u8; 8] = [0; 8];
let offset_end = index.saturating_add(self.offset_size as usize);
if offset_end > self.offset_mmap.len() {
return Err(NippyJarError::OffsetOutOfBounds { index })
}
buffer[..self.offset_size as usize].copy_from_slice(&self.offset_mmap[index..offset_end]);
Ok(u64::from_le_bytes(buffer))
}
pub const fn offset_size(&self) -> u8 {
self.offset_size
}
pub fn data(&self, range: Range<usize>) -> &[u8] {
&self.data_mmap[range]
}
pub fn size(&self) -> usize {
self.data_mmap.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
use compression::Compression;
use rand::{rngs::SmallRng, seq::SliceRandom, RngCore, SeedableRng};
use std::{collections::HashSet, fs::OpenOptions};
type ColumnResults<T> = Vec<ColumnResult<T>>;
type ColumnValues = Vec<Vec<u8>>;
fn test_data(seed: Option<u64>) -> (ColumnValues, ColumnValues) {
let value_length = 32;
let num_rows = 100;
let mut vec: Vec<u8> = vec![0; value_length];
let mut rng = seed.map(SmallRng::seed_from_u64).unwrap_or_else(SmallRng::from_entropy);
let mut gen = || {
(0..num_rows)
.map(|_| {
rng.fill_bytes(&mut vec[..]);
vec.clone()
})
.collect()
};
(gen(), gen())
}
fn clone_with_result(col: &ColumnValues) -> ColumnResults<Vec<u8>> {
col.iter().map(|v| Ok(v.clone())).collect()
}
#[test]
fn test_phf() {
let (col1, col2) = test_data(None);
let num_columns = 2;
let num_rows = col1.len() as u64;
let file_path = tempfile::NamedTempFile::new().unwrap();
let create_nippy = || -> NippyJar<()> {
let mut nippy = NippyJar::new_without_header(num_columns, file_path.path());
assert!(matches!(
NippyJar::set_keys(&mut nippy, &col1),
Err(NippyJarError::PHFMissing)
));
nippy
};
let check_phf = |mut nippy: NippyJar<_>| {
assert!(matches!(
NippyJar::get_index(&nippy, &col1[0]),
Err(NippyJarError::PHFMissingKeys)
));
assert!(NippyJar::set_keys(&mut nippy, &col1).is_ok());
let collect_indexes = |nippy: &NippyJar<_>| -> Vec<u64> {
col1.iter()
.map(|value| NippyJar::get_index(nippy, value.as_slice()).unwrap().unwrap())
.collect()
};
let indexes = collect_indexes(&nippy);
assert_eq!(indexes.iter().collect::<HashSet<_>>().len(), indexes.len());
assert!(NippyJar::set_keys(&mut nippy, &col1).is_ok());
assert_eq!(indexes, collect_indexes(&nippy));
nippy.prepare_index(clone_with_result(&col1), col1.len()).unwrap();
nippy
.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
.unwrap();
let mut loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
loaded_nippy.load_filters().unwrap();
assert_eq!(indexes, collect_indexes(&loaded_nippy));
};
check_phf(create_nippy().with_fmph());
check_phf(create_nippy().with_gofmph());
}
#[test]
fn test_filter() {
let (col1, col2) = test_data(Some(1));
let num_columns = 2;
let num_rows = col1.len() as u64;
let file_path = tempfile::NamedTempFile::new().unwrap();
let mut nippy = NippyJar::new_without_header(num_columns, file_path.path());
assert!(matches!(
InclusionFilter::add(&mut nippy, &col1[0]),
Err(NippyJarError::FilterMissing)
));
nippy = nippy.with_cuckoo_filter(4);
assert!(!InclusionFilter::contains(&nippy, &col1[0]).unwrap());
assert!(InclusionFilter::add(&mut nippy, &col1[0]).is_ok());
assert!(InclusionFilter::contains(&nippy, &col1[0]).unwrap());
assert!(!InclusionFilter::contains(&nippy, &col1[1]).unwrap());
assert!(InclusionFilter::add(&mut nippy, &col1[1]).is_ok());
assert!(InclusionFilter::contains(&nippy, &col1[1]).unwrap());
assert!(InclusionFilter::add(&mut nippy, &col1[2]).is_ok());
assert!(InclusionFilter::add(&mut nippy, &col1[3]).is_ok());
let nippy = nippy
.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
.unwrap();
let mut loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
loaded_nippy.load_filters().unwrap();
assert_eq!(nippy, loaded_nippy);
assert!(InclusionFilter::contains(&loaded_nippy, &col1[0]).unwrap());
assert!(InclusionFilter::contains(&loaded_nippy, &col1[1]).unwrap());
assert!(InclusionFilter::contains(&loaded_nippy, &col1[2]).unwrap());
assert!(InclusionFilter::contains(&loaded_nippy, &col1[3]).unwrap());
assert!(!InclusionFilter::contains(&loaded_nippy, &col1[4]).unwrap());
}
#[test]
fn test_zstd_with_dictionaries() {
let (col1, col2) = test_data(None);
let num_rows = col1.len() as u64;
let num_columns = 2;
let file_path = tempfile::NamedTempFile::new().unwrap();
let nippy = NippyJar::new_without_header(num_columns, file_path.path());
assert!(nippy.compressor().is_none());
let mut nippy =
NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(true, 5000);
assert!(nippy.compressor().is_some());
if let Some(Compressors::Zstd(zstd)) = &mut nippy.compressor_mut() {
assert!(matches!(zstd.compressors(), Err(NippyJarError::CompressorNotReady)));
assert!(matches!(
zstd.prepare_compression(vec![col1.clone(), col2.clone(), col2.clone()]),
Err(NippyJarError::ColumnLenMismatch(columns, 3)) if columns == num_columns
));
}
assert!(matches!(
nippy.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows),
Err(NippyJarError::CompressorNotReady)
));
let mut nippy =
NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(true, 5000);
assert!(nippy.compressor().is_some());
nippy.prepare_compression(vec![col1.clone(), col2.clone()]).unwrap();
if let Some(Compressors::Zstd(zstd)) = &nippy.compressor() {
assert!(matches!(
(&zstd.state, zstd.dictionaries.as_ref().map(|dict| dict.len())),
(compression::ZstdState::Ready, Some(columns)) if columns == num_columns
));
}
let nippy = nippy
.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
.unwrap();
let mut loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
loaded_nippy.load_filters().unwrap();
assert_eq!(nippy.version, loaded_nippy.version);
assert_eq!(nippy.columns, loaded_nippy.columns);
assert_eq!(nippy.filter, loaded_nippy.filter);
assert_eq!(nippy.phf, loaded_nippy.phf);
assert_eq!(nippy.offsets_index, loaded_nippy.offsets_index);
assert_eq!(nippy.max_row_size, loaded_nippy.max_row_size);
assert_eq!(nippy.path, loaded_nippy.path);
if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() {
assert!(zstd.use_dict);
let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
let mut row_index = 0usize;
while let Some(row) = cursor.next_row().unwrap() {
assert_eq!(
(row[0], row[1]),
(col1[row_index].as_slice(), col2[row_index].as_slice())
);
row_index += 1;
}
} else {
panic!("Expected Zstd compressor")
}
}
#[test]
fn test_lz4() {
let (col1, col2) = test_data(None);
let num_rows = col1.len() as u64;
let num_columns = 2;
let file_path = tempfile::NamedTempFile::new().unwrap();
let nippy = NippyJar::new_without_header(num_columns, file_path.path());
assert!(nippy.compressor().is_none());
let nippy = NippyJar::new_without_header(num_columns, file_path.path()).with_lz4();
assert!(nippy.compressor().is_some());
let nippy = nippy
.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
.unwrap();
let mut loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
loaded_nippy.load_filters().unwrap();
assert_eq!(nippy, loaded_nippy);
if let Some(Compressors::Lz4(_)) = loaded_nippy.compressor() {
let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
let mut row_index = 0usize;
while let Some(row) = cursor.next_row().unwrap() {
assert_eq!(
(row[0], row[1]),
(col1[row_index].as_slice(), col2[row_index].as_slice())
);
row_index += 1;
}
} else {
panic!("Expected Lz4 compressor")
}
}
#[test]
fn test_zstd_no_dictionaries() {
let (col1, col2) = test_data(None);
let num_rows = col1.len() as u64;
let num_columns = 2;
let file_path = tempfile::NamedTempFile::new().unwrap();
let nippy = NippyJar::new_without_header(num_columns, file_path.path());
assert!(nippy.compressor().is_none());
let nippy =
NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(false, 5000);
assert!(nippy.compressor().is_some());
let nippy = nippy
.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
.unwrap();
let mut loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
loaded_nippy.load_filters().unwrap();
assert_eq!(nippy, loaded_nippy);
if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() {
assert!(!zstd.use_dict);
let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
let mut row_index = 0usize;
while let Some(row) = cursor.next_row().unwrap() {
assert_eq!(
(row[0], row[1]),
(col1[row_index].as_slice(), col2[row_index].as_slice())
);
row_index += 1;
}
} else {
panic!("Expected Zstd compressor")
}
}
#[test]
fn test_full_nippy_jar() {
let (col1, col2) = test_data(None);
let num_rows = col1.len() as u64;
let num_columns = 2;
let file_path = tempfile::NamedTempFile::new().unwrap();
let data = vec![col1.clone(), col2.clone()];
let block_start = 500;
#[derive(Serialize, Deserialize, Debug)]
struct BlockJarHeader {
block_start: usize,
}
{
let mut nippy =
NippyJar::new(num_columns, file_path.path(), BlockJarHeader { block_start })
.with_zstd(true, 5000)
.with_cuckoo_filter(col1.len())
.with_fmph();
nippy.prepare_compression(data.clone()).unwrap();
nippy.prepare_index(clone_with_result(&col1), col1.len()).unwrap();
nippy
.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
.unwrap();
}
{
let mut loaded_nippy = NippyJar::<BlockJarHeader>::load(file_path.path()).unwrap();
loaded_nippy.load_filters().unwrap();
assert!(loaded_nippy.compressor().is_some());
assert!(loaded_nippy.filter.is_some());
assert!(loaded_nippy.phf.is_some());
assert_eq!(loaded_nippy.user_header().block_start, block_start);
if let Some(Compressors::Zstd(_zstd)) = loaded_nippy.compressor() {
let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
let mut row_num = 0usize;
while let Some(row) = cursor.next_row().unwrap() {
assert_eq!(
(row[0], row[1]),
(data[0][row_num].as_slice(), data[1][row_num].as_slice())
);
row_num += 1;
}
let mut data = col1.iter().zip(col2.iter()).enumerate().collect::<Vec<_>>();
data.shuffle(&mut rand::thread_rng());
for (row_num, (v0, v1)) in data {
{
let row_by_value = cursor
.row_by_key(v0)
.unwrap()
.unwrap()
.iter()
.map(|a| a.to_vec())
.collect::<Vec<_>>();
assert_eq!((&row_by_value[0], &row_by_value[1]), (v0, v1));
let row_by_num = cursor.row_by_number(row_num).unwrap().unwrap();
assert_eq!(row_by_value, row_by_num);
}
}
}
}
}
#[test]
fn test_selectable_column_values() {
let (col1, col2) = test_data(None);
let num_rows = col1.len() as u64;
let num_columns = 2;
let file_path = tempfile::NamedTempFile::new().unwrap();
let data = vec![col1.clone(), col2.clone()];
{
let mut nippy = NippyJar::new_without_header(num_columns, file_path.path())
.with_zstd(true, 5000)
.with_cuckoo_filter(col1.len())
.with_fmph();
nippy.prepare_compression(data).unwrap();
nippy.prepare_index(clone_with_result(&col1), col1.len()).unwrap();
nippy
.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
.unwrap();
}
{
let mut loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
loaded_nippy.load_filters().unwrap();
if let Some(Compressors::Zstd(_zstd)) = loaded_nippy.compressor() {
let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
let mut data = col1.iter().zip(col2.iter()).enumerate().collect::<Vec<_>>();
data.shuffle(&mut rand::thread_rng());
const BLOCKS_FULL_MASK: usize = 0b11;
for (row_num, (v0, v1)) in &data {
let row_by_value = cursor
.row_by_key_with_cols(v0, BLOCKS_FULL_MASK)
.unwrap()
.unwrap()
.iter()
.map(|a| a.to_vec())
.collect::<Vec<_>>();
assert_eq!((&row_by_value[0], &row_by_value[1]), (*v0, *v1));
let row_by_num = cursor
.row_by_number_with_cols(*row_num, BLOCKS_FULL_MASK)
.unwrap()
.unwrap();
assert_eq!(row_by_value, row_by_num);
}
const BLOCKS_BLOCK_MASK: usize = 0b01;
for (row_num, (v0, _)) in &data {
let row_by_value = cursor
.row_by_key_with_cols(v0, BLOCKS_BLOCK_MASK)
.unwrap()
.unwrap()
.iter()
.map(|a| a.to_vec())
.collect::<Vec<_>>();
assert_eq!(row_by_value.len(), 1);
assert_eq!(&row_by_value[0], *v0);
let row_by_num = cursor
.row_by_number_with_cols(*row_num, BLOCKS_BLOCK_MASK)
.unwrap()
.unwrap();
assert_eq!(row_by_num.len(), 1);
assert_eq!(row_by_value, row_by_num);
}
const BLOCKS_WITHDRAWAL_MASK: usize = 0b10;
for (row_num, (v0, v1)) in &data {
let row_by_value = cursor
.row_by_key_with_cols(v0, BLOCKS_WITHDRAWAL_MASK)
.unwrap()
.unwrap()
.iter()
.map(|a| a.to_vec())
.collect::<Vec<_>>();
assert_eq!(row_by_value.len(), 1);
assert_eq!(&row_by_value[0], *v1);
let row_by_num = cursor
.row_by_number_with_cols(*row_num, BLOCKS_WITHDRAWAL_MASK)
.unwrap()
.unwrap();
assert_eq!(row_by_num.len(), 1);
assert_eq!(row_by_value, row_by_num);
}
const BLOCKS_EMPTY_MASK: usize = 0b00;
for (row_num, (v0, _)) in &data {
assert!(cursor
.row_by_key_with_cols(v0, BLOCKS_EMPTY_MASK)
.unwrap()
.unwrap()
.is_empty());
assert!(cursor
.row_by_number_with_cols(*row_num, BLOCKS_EMPTY_MASK)
.unwrap()
.unwrap()
.is_empty());
}
}
}
}
#[test]
fn test_writer() {
let (col1, col2) = test_data(None);
let num_columns = 2;
let file_path = tempfile::NamedTempFile::new().unwrap();
append_two_rows(num_columns, file_path.path(), &col1, &col2);
prune_rows(num_columns, file_path.path(), &col1, &col2);
append_two_rows(num_columns, file_path.path(), &col1, &col2);
test_append_consistency_no_commit(file_path.path(), &col1, &col2);
test_append_consistency_partial_commit(file_path.path(), &col1, &col2);
}
#[test]
fn test_pruner() {
let (col1, col2) = test_data(None);
let num_columns = 2;
let num_rows = 2;
let missing_offsets_scenarios = [(1, 1), (2, 1), (3, 0)];
for (missing_offsets, expected_rows) in missing_offsets_scenarios {
let file_path = tempfile::NamedTempFile::new().unwrap();
append_two_rows(num_columns, file_path.path(), &col1, &col2);
simulate_interrupted_prune(num_columns, file_path.path(), num_rows, missing_offsets);
let nippy = NippyJar::load_without_header(file_path.path()).unwrap();
assert_eq!(nippy.rows, expected_rows);
}
}
fn test_append_consistency_partial_commit(
file_path: &Path,
col1: &[Vec<u8>],
col2: &[Vec<u8>],
) {
let nippy = NippyJar::load_without_header(file_path).unwrap();
let initial_rows = nippy.rows;
let initial_data_size =
File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
let initial_offset_size =
File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize;
assert!(initial_data_size > 0);
assert!(initial_offset_size > 0);
let mut writer = NippyJarWriter::new(nippy, ConsistencyFailStrategy::Heal).unwrap();
writer.append_column(Some(Ok(&col1[2]))).unwrap();
writer.append_column(Some(Ok(&col2[2]))).unwrap();
let _ = writer.offsets_mut().pop();
writer.commit_offsets().unwrap();
drop(writer);
let nippy = NippyJar::load_without_header(file_path).unwrap();
assert_eq!(initial_rows, nippy.rows);
let new_data_size =
File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
assert_eq!(new_data_size, initial_data_size + col1[2].len() + col2[2].len());
assert_eq!(
initial_offset_size + 8,
File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize
);
let writer = NippyJarWriter::new(nippy, ConsistencyFailStrategy::Heal).unwrap();
assert_eq!(initial_rows, writer.rows());
assert_eq!(
initial_offset_size,
File::open(writer.offsets_path()).unwrap().metadata().unwrap().len() as usize
);
assert_eq!(
initial_data_size,
File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize
);
}
fn test_append_consistency_no_commit(file_path: &Path, col1: &[Vec<u8>], col2: &[Vec<u8>]) {
let nippy = NippyJar::load_without_header(file_path).unwrap();
let initial_rows = nippy.rows;
let initial_data_size =
File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
let initial_offset_size =
File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize;
assert!(initial_data_size > 0);
assert!(initial_offset_size > 0);
let mut writer = NippyJarWriter::new(nippy, ConsistencyFailStrategy::Heal).unwrap();
writer.append_column(Some(Ok(&col1[2]))).unwrap();
writer.append_column(Some(Ok(&col2[2]))).unwrap();
drop(writer);
let nippy = NippyJar::load_without_header(file_path).unwrap();
assert_eq!(initial_rows, nippy.rows);
let new_data_size =
File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
assert_eq!(new_data_size, initial_data_size + col1[2].len() + col2[2].len());
assert_eq!(
initial_offset_size,
File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize
);
let writer = NippyJarWriter::new(nippy, ConsistencyFailStrategy::Heal).unwrap();
assert_eq!(initial_rows, writer.rows());
assert_eq!(
initial_data_size,
File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize
);
}
fn append_two_rows(num_columns: usize, file_path: &Path, col1: &[Vec<u8>], col2: &[Vec<u8>]) {
{
let nippy = NippyJar::new_without_header(num_columns, file_path);
nippy.freeze_config().unwrap();
assert_eq!(nippy.max_row_size, 0);
assert_eq!(nippy.rows, 0);
let mut writer = NippyJarWriter::new(nippy, ConsistencyFailStrategy::Heal).unwrap();
assert_eq!(writer.column(), 0);
writer.append_column(Some(Ok(&col1[0]))).unwrap();
assert_eq!(writer.column(), 1);
assert!(writer.is_dirty());
writer.append_column(Some(Ok(&col2[0]))).unwrap();
assert!(writer.is_dirty());
assert_eq!(writer.column(), 0);
assert_eq!(writer.offsets().len(), 3);
let expected_data_file_size = *writer.offsets().last().unwrap();
writer.commit().unwrap();
assert!(!writer.is_dirty());
assert_eq!(writer.max_row_size(), col1[0].len() + col2[0].len());
assert_eq!(writer.rows(), 1);
assert_eq!(
File::open(writer.offsets_path()).unwrap().metadata().unwrap().len(),
1 + num_columns as u64 * 8 + 8
);
assert_eq!(
File::open(writer.data_path()).unwrap().metadata().unwrap().len(),
expected_data_file_size
);
}
{
let nippy = NippyJar::load_without_header(file_path).unwrap();
assert_eq!(nippy.max_row_size, col1[0].len() + col2[0].len());
assert_eq!(nippy.rows, 1);
let mut writer = NippyJarWriter::new(nippy, ConsistencyFailStrategy::Heal).unwrap();
assert_eq!(writer.column(), 0);
writer.append_column(Some(Ok(&col1[1]))).unwrap();
assert_eq!(writer.column(), 1);
writer.append_column(Some(Ok(&col2[1]))).unwrap();
assert_eq!(writer.column(), 0);
assert_eq!(writer.offsets().len(), 3);
let expected_data_file_size = *writer.offsets().last().unwrap();
writer.commit().unwrap();
assert_eq!(writer.max_row_size(), col1[0].len() + col2[0].len());
assert_eq!(writer.rows(), 2);
assert_eq!(
File::open(writer.offsets_path()).unwrap().metadata().unwrap().len(),
1 + writer.rows() as u64 * num_columns as u64 * 8 + 8
);
assert_eq!(
File::open(writer.data_path()).unwrap().metadata().unwrap().len(),
expected_data_file_size
);
}
}
fn prune_rows(num_columns: usize, file_path: &Path, col1: &[Vec<u8>], col2: &[Vec<u8>]) {
let nippy = NippyJar::load_without_header(file_path).unwrap();
let mut writer = NippyJarWriter::new(nippy, ConsistencyFailStrategy::Heal).unwrap();
writer.append_column(Some(Ok(&col1[2]))).unwrap();
writer.append_column(Some(Ok(&col2[2]))).unwrap();
assert!(writer.is_dirty());
writer.prune_rows(2).unwrap();
assert_eq!(writer.rows(), 1);
assert_eq!(
File::open(writer.offsets_path()).unwrap().metadata().unwrap().len(),
1 + writer.rows() as u64 * num_columns as u64 * 8 + 8
);
let expected_data_size = col1[0].len() + col2[0].len();
assert_eq!(
File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize,
expected_data_size
);
let nippy = NippyJar::load_without_header(file_path).unwrap();
{
let data_reader = nippy.open_data_reader().unwrap();
assert_eq!(data_reader.offset(2).unwrap(), expected_data_size as u64);
}
let mut writer = NippyJarWriter::new(nippy, ConsistencyFailStrategy::Heal).unwrap();
writer.prune_rows(1).unwrap();
assert!(writer.is_dirty());
assert_eq!(writer.rows(), 0);
assert_eq!(writer.max_row_size(), 0);
assert_eq!(File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize, 0);
assert_eq!(
File::open(writer.offsets_path()).unwrap().metadata().unwrap().len() as usize,
1
);
writer.commit().unwrap();
assert!(!writer.is_dirty());
}
fn simulate_interrupted_prune(
num_columns: usize,
file_path: &Path,
num_rows: u64,
missing_offsets: u64,
) {
let nippy = NippyJar::load_without_header(file_path).unwrap();
let reader = nippy.open_data_reader().unwrap();
let offsets_file =
OpenOptions::new().read(true).write(true).open(nippy.offsets_path()).unwrap();
let offsets_len = 1 + num_rows * num_columns as u64 * 8 + 8;
assert_eq!(offsets_len, offsets_file.metadata().unwrap().len());
let data_file = OpenOptions::new().read(true).write(true).open(nippy.data_path()).unwrap();
let data_len = reader.reverse_offset(0).unwrap();
assert_eq!(data_len, data_file.metadata().unwrap().len());
data_file.set_len(data_len - 32 * missing_offsets).unwrap();
let _ = NippyJarWriter::new(nippy, ConsistencyFailStrategy::Heal).unwrap();
}
}