reth_nippy_jar/
writer.rs
1use crate::{
2 compression::Compression, ColumnResult, NippyJar, NippyJarChecker, NippyJarError,
3 NippyJarHeader,
4};
5use std::{
6 fs::{File, OpenOptions},
7 io::{BufWriter, Read, Seek, SeekFrom, Write},
8 path::Path,
9};
10
11pub(crate) const OFFSET_SIZE_BYTES: u8 = 8;
13
14#[derive(Debug)]
28pub struct NippyJarWriter<H: NippyJarHeader = ()> {
29 jar: NippyJar<H>,
32 data_file: BufWriter<File>,
34 offsets_file: BufWriter<File>,
36 tmp_buf: Vec<u8>,
38 uncompressed_row_size: usize,
40 offsets: Vec<u64>,
42 column: usize,
44 dirty: bool,
46}
47
48impl<H: NippyJarHeader> NippyJarWriter<H> {
49 pub fn new(jar: NippyJar<H>) -> Result<Self, NippyJarError> {
53 let (data_file, offsets_file, is_created) =
54 Self::create_or_open_files(jar.data_path(), &jar.offsets_path())?;
55
56 let (jar, data_file, offsets_file) = if is_created {
57 jar.freeze_config()?;
59
60 (jar, BufWriter::new(data_file), BufWriter::new(offsets_file))
61 } else {
62 let mut checker = NippyJarChecker::new(jar);
65 checker.ensure_consistency()?;
66
67 let NippyJarChecker { jar, data_file, offsets_file } = checker;
68
69 (jar, data_file.expect("qed"), offsets_file.expect("qed"))
71 };
72
73 let mut writer = Self {
74 jar,
75 data_file,
76 offsets_file,
77 tmp_buf: Vec::with_capacity(1_000_000),
78 uncompressed_row_size: 0,
79 offsets: Vec::with_capacity(1_000_000),
80 column: 0,
81 dirty: false,
82 };
83
84 if !is_created {
85 writer.commit()?;
87 }
88
89 Ok(writer)
90 }
91
92 pub const fn user_header(&self) -> &H {
94 &self.jar.user_header
95 }
96
97 pub const fn user_header_mut(&mut self) -> &mut H {
102 self.dirty = true;
103 &mut self.jar.user_header
104 }
105
106 pub const fn is_dirty(&self) -> bool {
108 self.dirty
109 }
110
111 pub const fn set_dirty(&mut self) {
113 self.dirty = true
114 }
115
116 pub const fn rows(&self) -> usize {
118 self.jar.rows()
119 }
120
121 pub fn into_jar(self) -> NippyJar<H> {
123 self.jar
124 }
125
126 fn create_or_open_files(
127 data: &Path,
128 offsets: &Path,
129 ) -> Result<(File, File, bool), NippyJarError> {
130 let is_created = !data.exists() || !offsets.exists();
131
132 if !data.exists() {
133 File::create(data)?;
135 }
136
137 let mut data_file = OpenOptions::new().read(true).write(true).open(data)?;
138 data_file.seek(SeekFrom::End(0))?;
139
140 if !offsets.exists() {
141 File::create(offsets)?;
143 }
144
145 let mut offsets_file = OpenOptions::new().read(true).write(true).open(offsets)?;
146 if is_created {
147 let mut buf = Vec::with_capacity(1 + OFFSET_SIZE_BYTES as usize);
148
149 buf.write_all(&[OFFSET_SIZE_BYTES])?;
151
152 buf.write_all(&[0; OFFSET_SIZE_BYTES as usize])?;
155
156 offsets_file.write_all(&buf)?;
157 offsets_file.seek(SeekFrom::End(0))?;
158 }
159
160 Ok((data_file, offsets_file, is_created))
161 }
162
163 pub fn append_rows(
169 &mut self,
170 column_values_per_row: Vec<impl IntoIterator<Item = ColumnResult<impl AsRef<[u8]>>>>,
171 num_rows: u64,
172 ) -> Result<(), NippyJarError> {
173 let mut column_iterators = column_values_per_row
174 .into_iter()
175 .map(|v| v.into_iter())
176 .collect::<Vec<_>>()
177 .into_iter();
178
179 for _ in 0..num_rows {
180 let mut iterators = Vec::with_capacity(self.jar.columns);
181
182 for mut column_iter in column_iterators {
183 self.append_column(column_iter.next())?;
184
185 iterators.push(column_iter);
186 }
187
188 column_iterators = iterators.into_iter();
189 }
190
191 Ok(())
192 }
193
194 pub fn append_column(
197 &mut self,
198 column: Option<ColumnResult<impl AsRef<[u8]>>>,
199 ) -> Result<(), NippyJarError> {
200 self.dirty = true;
201
202 match column {
203 Some(Ok(value)) => {
204 if self.offsets.is_empty() {
205 self.offsets.push(self.data_file.stream_position()?);
207 }
208
209 let written = self.write_column(value.as_ref())?;
210
211 self.offsets.push(self.offsets.last().expect("qed") + written as u64);
214 }
215 None => {
216 return Err(NippyJarError::UnexpectedMissingValue(
217 self.jar.rows as u64,
218 self.column as u64,
219 ))
220 }
221 Some(Err(err)) => return Err(err.into()),
222 }
223
224 Ok(())
225 }
226
227 fn write_column(&mut self, value: &[u8]) -> Result<usize, NippyJarError> {
229 self.uncompressed_row_size += value.len();
230 let len = if let Some(compression) = &self.jar.compressor {
231 let before = self.tmp_buf.len();
232 let len = compression.compress_to(value, &mut self.tmp_buf)?;
233 self.data_file.write_all(&self.tmp_buf[before..before + len])?;
234 len
235 } else {
236 self.data_file.write_all(value)?;
237 value.len()
238 };
239
240 self.column += 1;
241
242 if self.jar.columns == self.column {
243 self.finalize_row();
244 }
245
246 Ok(len)
247 }
248
249 pub fn prune_rows(&mut self, num_rows: usize) -> Result<(), NippyJarError> {
251 self.dirty = true;
252
253 self.offsets_file.flush()?;
254 self.data_file.flush()?;
255
256 let num_offsets = num_rows * self.jar.columns;
258
259 let offsets_prune_count = num_offsets.min(self.offsets.len().saturating_sub(1)); let remaining_to_prune = num_offsets.saturating_sub(offsets_prune_count);
262
263 if offsets_prune_count > 0 {
265 let new_len = self.offsets[(self.offsets.len() - 1) - offsets_prune_count]; self.offsets.truncate(self.offsets.len() - offsets_prune_count);
268
269 self.data_file.get_mut().set_len(new_len)?;
271 }
272
273 if remaining_to_prune > 0 {
275 let length = self.offsets_file.get_ref().metadata()?.len();
277
278 if length > 1 {
280 let num_offsets = (length - 1) / OFFSET_SIZE_BYTES as u64;
282
283 if remaining_to_prune as u64 > num_offsets {
284 return Err(NippyJarError::InvalidPruning(
285 num_offsets,
286 remaining_to_prune as u64,
287 ))
288 }
289
290 let new_num_offsets = num_offsets.saturating_sub(remaining_to_prune as u64);
291
292 if new_num_offsets <= 1 {
294 self.offsets_file.get_mut().set_len(1)?;
296 self.data_file.get_mut().set_len(0)?;
297 } else {
298 let new_len = 1 + new_num_offsets * OFFSET_SIZE_BYTES as u64;
300 self.offsets_file
302 .seek(SeekFrom::Start(new_len.saturating_sub(OFFSET_SIZE_BYTES as u64)))?;
303 let mut last_offset = [0u8; OFFSET_SIZE_BYTES as usize];
305 self.offsets_file.get_ref().read_exact(&mut last_offset)?;
306 let last_offset = u64::from_le_bytes(last_offset);
307
308 self.offsets_file.get_mut().set_len(new_len)?;
310 self.data_file.get_mut().set_len(last_offset)?;
311 }
312 } else {
313 return Err(NippyJarError::InvalidPruning(0, remaining_to_prune as u64))
314 }
315 }
316
317 self.offsets_file.get_ref().sync_all()?;
318 self.data_file.get_ref().sync_all()?;
319
320 self.offsets_file.seek(SeekFrom::End(0))?;
321 self.data_file.seek(SeekFrom::End(0))?;
322
323 self.jar.rows = self.jar.rows.saturating_sub(num_rows);
324 if self.jar.rows == 0 {
325 self.jar.max_row_size = 0;
326 }
327 self.jar.freeze_config()?;
328
329 Ok(())
330 }
331
332 fn finalize_row(&mut self) {
335 self.jar.max_row_size = self.jar.max_row_size.max(self.uncompressed_row_size);
336 self.jar.rows += 1;
337
338 self.tmp_buf.clear();
339 self.uncompressed_row_size = 0;
340 self.column = 0;
341 }
342
343 pub fn commit(&mut self) -> Result<(), NippyJarError> {
345 self.data_file.flush()?;
346 self.data_file.get_ref().sync_all()?;
347
348 self.commit_offsets()?;
349
350 self.jar.freeze_config()?;
352 self.dirty = false;
353
354 Ok(())
355 }
356
357 #[cfg(feature = "test-utils")]
362 pub fn commit_without_sync_all(&mut self) -> Result<(), NippyJarError> {
363 self.data_file.flush()?;
364
365 self.commit_offsets_without_sync_all()?;
366
367 self.jar.freeze_config()?;
369 self.dirty = false;
370
371 Ok(())
372 }
373
374 pub(crate) fn commit_offsets(&mut self) -> Result<(), NippyJarError> {
376 self.commit_offsets_inner()?;
377 self.offsets_file.get_ref().sync_all()?;
378
379 Ok(())
380 }
381
382 #[cfg(feature = "test-utils")]
383 fn commit_offsets_without_sync_all(&mut self) -> Result<(), NippyJarError> {
384 self.commit_offsets_inner()
385 }
386
387 fn commit_offsets_inner(&mut self) -> Result<(), NippyJarError> {
392 let mut last_offset_ondisk = if self.offsets_file.get_ref().metadata()?.len() > 1 {
395 self.offsets_file.seek(SeekFrom::End(-(OFFSET_SIZE_BYTES as i64)))?;
396 let mut buf = [0u8; OFFSET_SIZE_BYTES as usize];
397 self.offsets_file.get_ref().read_exact(&mut buf)?;
398 Some(u64::from_le_bytes(buf))
399 } else {
400 None
401 };
402
403 self.offsets_file.seek(SeekFrom::End(0))?;
404
405 for offset in self.offsets.drain(..) {
407 if let Some(last_offset_ondisk) = last_offset_ondisk.take() {
408 if last_offset_ondisk == offset {
409 continue
410 }
411 }
412 self.offsets_file.write_all(&offset.to_le_bytes())?;
413 }
414 self.offsets_file.flush()?;
415
416 Ok(())
417 }
418
419 #[cfg(test)]
421 pub const fn max_row_size(&self) -> usize {
422 self.jar.max_row_size
423 }
424
425 #[cfg(test)]
427 pub const fn column(&self) -> usize {
428 self.column
429 }
430
431 #[cfg(test)]
433 pub fn offsets(&self) -> &[u64] {
434 &self.offsets
435 }
436
437 #[cfg(test)]
439 pub const fn offsets_mut(&mut self) -> &mut Vec<u64> {
440 &mut self.offsets
441 }
442
443 #[cfg(test)]
445 pub fn offsets_path(&self) -> std::path::PathBuf {
446 self.jar.offsets_path()
447 }
448
449 #[cfg(test)]
451 pub fn data_path(&self) -> &Path {
452 self.jar.data_path()
453 }
454
455 #[cfg(any(test, feature = "test-utils"))]
457 pub const fn data_file(&mut self) -> &mut BufWriter<File> {
458 &mut self.data_file
459 }
460
461 #[cfg(any(test, feature = "test-utils"))]
463 pub const fn jar(&self) -> &NippyJar<H> {
464 &self.jar
465 }
466}