reth_nippy_jar/cursor.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
use crate::{
compression::{Compression, Compressors, Zstd},
DataReader, NippyJar, NippyJarError, NippyJarHeader, RefRow,
};
use std::{ops::Range, sync::Arc};
use zstd::bulk::Decompressor;
/// Simple cursor implementation to retrieve data from [`NippyJar`].
#[derive(Clone)]
pub struct NippyJarCursor<'a, H = ()> {
/// [`NippyJar`] which holds most of the required configuration to read from the file.
jar: &'a NippyJar<H>,
/// Data and offset reader.
reader: Arc<DataReader>,
/// Internal buffer to unload data to without reallocating memory on each retrieval.
internal_buffer: Vec<u8>,
/// Cursor row position.
row: u64,
}
impl<H: NippyJarHeader> std::fmt::Debug for NippyJarCursor<'_, H> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NippyJarCursor").field("config", &self.jar).finish_non_exhaustive()
}
}
impl<'a, H: NippyJarHeader> NippyJarCursor<'a, H> {
/// Creates a new instance of [`NippyJarCursor`] for the given [`NippyJar`].
pub fn new(jar: &'a NippyJar<H>) -> Result<Self, NippyJarError> {
let max_row_size = jar.max_row_size;
Ok(Self {
jar,
reader: Arc::new(jar.open_data_reader()?),
// Makes sure that we have enough buffer capacity to decompress any row of data.
internal_buffer: Vec::with_capacity(max_row_size),
row: 0,
})
}
/// Creates a new instance of [`NippyJarCursor`] with the specified [`NippyJar`] and data
/// reader.
pub fn with_reader(
jar: &'a NippyJar<H>,
reader: Arc<DataReader>,
) -> Result<Self, NippyJarError> {
let max_row_size = jar.max_row_size;
Ok(Self {
jar,
reader,
// Makes sure that we have enough buffer capacity to decompress any row of data.
internal_buffer: Vec::with_capacity(max_row_size),
row: 0,
})
}
/// Returns a reference to the related [`NippyJar`]
pub const fn jar(&self) -> &NippyJar<H> {
self.jar
}
/// Returns current row index of the cursor
pub const fn row_index(&self) -> u64 {
self.row
}
/// Resets cursor to the beginning.
pub fn reset(&mut self) {
self.row = 0;
}
/// Returns a row by its number.
pub fn row_by_number(&mut self, row: usize) -> Result<Option<RefRow<'_>>, NippyJarError> {
self.row = row as u64;
self.next_row()
}
/// Returns the current value and advances the row.
pub fn next_row(&mut self) -> Result<Option<RefRow<'_>>, NippyJarError> {
self.internal_buffer.clear();
if self.row as usize >= self.jar.rows {
// Has reached the end
return Ok(None)
}
let mut row = Vec::with_capacity(self.jar.columns);
// Retrieve all column values from the row
for column in 0..self.jar.columns {
self.read_value(column, &mut row)?;
}
self.row += 1;
Ok(Some(
row.into_iter()
.map(|v| match v {
ValueRange::Mmap(range) => self.reader.data(range),
ValueRange::Internal(range) => &self.internal_buffer[range],
})
.collect(),
))
}
/// Returns a row by its number by using a `mask` to only read certain columns from the row.
pub fn row_by_number_with_cols(
&mut self,
row: usize,
mask: usize,
) -> Result<Option<RefRow<'_>>, NippyJarError> {
self.row = row as u64;
self.next_row_with_cols(mask)
}
/// Returns the current value and advances the row.
///
/// Uses a `mask` to only read certain columns from the row.
pub fn next_row_with_cols(&mut self, mask: usize) -> Result<Option<RefRow<'_>>, NippyJarError> {
self.internal_buffer.clear();
if self.row as usize >= self.jar.rows {
// Has reached the end
return Ok(None)
}
let columns = self.jar.columns;
let mut row = Vec::with_capacity(columns);
for column in 0..columns {
if mask & (1 << column) != 0 {
self.read_value(column, &mut row)?
}
}
self.row += 1;
Ok(Some(
row.into_iter()
.map(|v| match v {
ValueRange::Mmap(range) => self.reader.data(range),
ValueRange::Internal(range) => &self.internal_buffer[range],
})
.collect(),
))
}
/// Takes the column index and reads the range value for the corresponding column.
fn read_value(
&mut self,
column: usize,
row: &mut Vec<ValueRange>,
) -> Result<(), NippyJarError> {
// Find out the offset of the column value
let offset_pos = self.row as usize * self.jar.columns + column;
let value_offset = self.reader.offset(offset_pos)? as usize;
let column_offset_range = if self.jar.rows * self.jar.columns == offset_pos + 1 {
// It's the last column of the last row
value_offset..self.reader.size()
} else {
let next_value_offset = self.reader.offset(offset_pos + 1)? as usize;
value_offset..next_value_offset
};
if let Some(compression) = self.jar.compressor() {
let from = self.internal_buffer.len();
match compression {
Compressors::Zstd(z) if z.use_dict => {
// If we are here, then for sure we have the necessary dictionaries and they're
// loaded (happens during deserialization). Otherwise, there's an issue
// somewhere else and we can't recover here anyway.
let dictionaries = z.dictionaries.as_ref().expect("dictionaries to exist")
[column]
.loaded()
.expect("dictionary to be loaded");
let mut decompressor = Decompressor::with_prepared_dictionary(dictionaries)?;
Zstd::decompress_with_dictionary(
self.reader.data(column_offset_range),
&mut self.internal_buffer,
&mut decompressor,
)?;
}
_ => {
// Uses the chosen default decompressor
compression.decompress_to(
self.reader.data(column_offset_range),
&mut self.internal_buffer,
)?;
}
}
let to = self.internal_buffer.len();
row.push(ValueRange::Internal(from..to));
} else {
// Not compressed
row.push(ValueRange::Mmap(column_offset_range));
}
Ok(())
}
}
/// Helper type that stores the range of the decompressed column value either on a `mmap` slice or
/// on the internal buffer.
enum ValueRange {
Mmap(Range<usize>),
Internal(Range<usize>),
}