reth_nippy_jar/
cursor.rs
1use crate::{
2 compression::{Compression, Compressors, Zstd},
3 DataReader, NippyJar, NippyJarError, NippyJarHeader, RefRow,
4};
5use std::{ops::Range, sync::Arc};
6use zstd::bulk::Decompressor;
7
8#[derive(Clone)]
10pub struct NippyJarCursor<'a, H = ()> {
11 jar: &'a NippyJar<H>,
13 reader: Arc<DataReader>,
15 internal_buffer: Vec<u8>,
17 row: u64,
19}
20
21impl<H: NippyJarHeader> std::fmt::Debug for NippyJarCursor<'_, H> {
22 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23 f.debug_struct("NippyJarCursor").field("config", &self.jar).finish_non_exhaustive()
24 }
25}
26
27impl<'a, H: NippyJarHeader> NippyJarCursor<'a, H> {
28 pub fn new(jar: &'a NippyJar<H>) -> Result<Self, NippyJarError> {
30 let max_row_size = jar.max_row_size;
31 Ok(Self {
32 jar,
33 reader: Arc::new(jar.open_data_reader()?),
34 internal_buffer: Vec::with_capacity(max_row_size),
36 row: 0,
37 })
38 }
39
40 pub fn with_reader(
43 jar: &'a NippyJar<H>,
44 reader: Arc<DataReader>,
45 ) -> Result<Self, NippyJarError> {
46 let max_row_size = jar.max_row_size;
47 Ok(Self {
48 jar,
49 reader,
50 internal_buffer: Vec::with_capacity(max_row_size),
52 row: 0,
53 })
54 }
55
56 pub const fn jar(&self) -> &NippyJar<H> {
58 self.jar
59 }
60
61 pub const fn row_index(&self) -> u64 {
63 self.row
64 }
65
66 pub const fn reset(&mut self) {
68 self.row = 0;
69 }
70
71 pub fn row_by_number(&mut self, row: usize) -> Result<Option<RefRow<'_>>, NippyJarError> {
73 self.row = row as u64;
74 self.next_row()
75 }
76
77 pub fn next_row(&mut self) -> Result<Option<RefRow<'_>>, NippyJarError> {
79 self.internal_buffer.clear();
80
81 if self.row as usize >= self.jar.rows {
82 return Ok(None)
84 }
85
86 let mut row = Vec::with_capacity(self.jar.columns);
87
88 for column in 0..self.jar.columns {
90 self.read_value(column, &mut row)?;
91 }
92
93 self.row += 1;
94
95 Ok(Some(
96 row.into_iter()
97 .map(|v| match v {
98 ValueRange::Mmap(range) => self.reader.data(range),
99 ValueRange::Internal(range) => &self.internal_buffer[range],
100 })
101 .collect(),
102 ))
103 }
104
105 pub fn row_by_number_with_cols(
107 &mut self,
108 row: usize,
109 mask: usize,
110 ) -> Result<Option<RefRow<'_>>, NippyJarError> {
111 self.row = row as u64;
112 self.next_row_with_cols(mask)
113 }
114
115 pub fn next_row_with_cols(&mut self, mask: usize) -> Result<Option<RefRow<'_>>, NippyJarError> {
119 self.internal_buffer.clear();
120
121 if self.row as usize >= self.jar.rows {
122 return Ok(None)
124 }
125
126 let columns = self.jar.columns;
127 let mut row = Vec::with_capacity(columns);
128
129 for column in 0..columns {
130 if mask & (1 << column) != 0 {
131 self.read_value(column, &mut row)?
132 }
133 }
134 self.row += 1;
135
136 Ok(Some(
137 row.into_iter()
138 .map(|v| match v {
139 ValueRange::Mmap(range) => self.reader.data(range),
140 ValueRange::Internal(range) => &self.internal_buffer[range],
141 })
142 .collect(),
143 ))
144 }
145
146 fn read_value(
148 &mut self,
149 column: usize,
150 row: &mut Vec<ValueRange>,
151 ) -> Result<(), NippyJarError> {
152 let offset_pos = self.row as usize * self.jar.columns + column;
154 let value_offset = self.reader.offset(offset_pos)? as usize;
155
156 let column_offset_range = if self.jar.rows * self.jar.columns == offset_pos + 1 {
157 value_offset..self.reader.size()
159 } else {
160 let next_value_offset = self.reader.offset(offset_pos + 1)? as usize;
161 value_offset..next_value_offset
162 };
163
164 if let Some(compression) = self.jar.compressor() {
165 let from = self.internal_buffer.len();
166 match compression {
167 Compressors::Zstd(z) if z.use_dict => {
168 let dictionaries = z.dictionaries.as_ref().expect("dictionaries to exist")
172 [column]
173 .loaded()
174 .expect("dictionary to be loaded");
175 let mut decompressor = Decompressor::with_prepared_dictionary(dictionaries)?;
176 Zstd::decompress_with_dictionary(
177 self.reader.data(column_offset_range),
178 &mut self.internal_buffer,
179 &mut decompressor,
180 )?;
181 }
182 _ => {
183 compression.decompress_to(
185 self.reader.data(column_offset_range),
186 &mut self.internal_buffer,
187 )?;
188 }
189 }
190 let to = self.internal_buffer.len();
191
192 row.push(ValueRange::Internal(from..to));
193 } else {
194 row.push(ValueRange::Mmap(column_offset_range));
196 }
197
198 Ok(())
199 }
200}
201
202enum ValueRange {
205 Mmap(Range<usize>),
206 Internal(Range<usize>),
207}