reth_nippy_jar/
writer.rs

1use crate::{
2    compression::Compression, ColumnResult, NippyJar, NippyJarChecker, NippyJarError,
3    NippyJarHeader,
4};
5use std::{
6    fs::{File, OpenOptions},
7    io::{BufWriter, Read, Seek, SeekFrom, Write},
8    path::Path,
9};
10
11/// Size of one offset in bytes.
12pub(crate) const OFFSET_SIZE_BYTES: u8 = 8;
13
14/// Writer of [`NippyJar`]. Handles table data and offsets only.
15///
16/// Table data is written directly to disk, while offsets and configuration need to be flushed by
17/// calling `commit()`.
18///
19/// ## Offset file layout
20/// The first byte is the size of a single offset in bytes, `m`.
21/// Then, the file contains `n` entries, each with a size of `m`. Each entry represents an offset,
22/// except for the last entry, which represents both the total size of the data file, as well as the
23/// next offset to write new data to.
24///
25/// ## Data file layout
26/// The data file is represented just as a sequence of bytes of data without any delimiters
27#[derive(Debug)]
28pub struct NippyJarWriter<H: NippyJarHeader = ()> {
29    /// Associated [`NippyJar`], containing all necessary configurations for data
30    /// handling.
31    jar: NippyJar<H>,
32    /// File handle to where the data is stored.
33    data_file: BufWriter<File>,
34    /// File handle to where the offsets are stored.
35    offsets_file: BufWriter<File>,
36    /// Temporary buffer to reuse when compressing data.
37    tmp_buf: Vec<u8>,
38    /// Used to find the maximum uncompressed size of a row in a jar.
39    uncompressed_row_size: usize,
40    /// Partial offset list which hasn't been flushed to disk.
41    offsets: Vec<u64>,
42    /// Column where writer is going to write next.
43    column: usize,
44    /// Whether the writer has changed data that needs to be committed.
45    dirty: bool,
46}
47
48impl<H: NippyJarHeader> NippyJarWriter<H> {
49    /// Creates a [`NippyJarWriter`] from [`NippyJar`].
50    ///
51    /// If will  **always** attempt to heal any inconsistent state when called.
52    pub fn new(jar: NippyJar<H>) -> Result<Self, NippyJarError> {
53        let (data_file, offsets_file, is_created) =
54            Self::create_or_open_files(jar.data_path(), &jar.offsets_path())?;
55
56        let (jar, data_file, offsets_file) = if is_created {
57            // Makes sure we don't have dangling data and offset files when we just created the file
58            jar.freeze_config()?;
59
60            (jar, BufWriter::new(data_file), BufWriter::new(offsets_file))
61        } else {
62            // If we are opening a previously created jar, we need to check its consistency, and
63            // make changes if necessary.
64            let mut checker = NippyJarChecker::new(jar);
65            checker.ensure_consistency()?;
66
67            let NippyJarChecker { jar, data_file, offsets_file } = checker;
68
69            // Calling ensure_consistency, will fill data_file and offsets_file
70            (jar, data_file.expect("qed"), offsets_file.expect("qed"))
71        };
72
73        let mut writer = Self {
74            jar,
75            data_file,
76            offsets_file,
77            tmp_buf: Vec::with_capacity(1_000_000),
78            uncompressed_row_size: 0,
79            offsets: Vec::with_capacity(1_000_000),
80            column: 0,
81            dirty: false,
82        };
83
84        if !is_created {
85            // Commit any potential heals done above.
86            writer.commit()?;
87        }
88
89        Ok(writer)
90    }
91
92    /// Returns a reference to `H` of [`NippyJar`]
93    pub const fn user_header(&self) -> &H {
94        &self.jar.user_header
95    }
96
97    /// Returns a mutable reference to `H` of [`NippyJar`].
98    ///
99    /// Since there's no way of knowing if `H` has been actually changed, this sets `self.dirty` to
100    /// true.
101    pub const fn user_header_mut(&mut self) -> &mut H {
102        self.dirty = true;
103        &mut self.jar.user_header
104    }
105
106    /// Returns whether there are changes that need to be committed.
107    pub const fn is_dirty(&self) -> bool {
108        self.dirty
109    }
110
111    /// Sets writer as dirty.
112    pub const fn set_dirty(&mut self) {
113        self.dirty = true
114    }
115
116    /// Gets total writer rows in jar.
117    pub const fn rows(&self) -> usize {
118        self.jar.rows()
119    }
120
121    /// Consumes the writer and returns the associated [`NippyJar`].
122    pub fn into_jar(self) -> NippyJar<H> {
123        self.jar
124    }
125
126    fn create_or_open_files(
127        data: &Path,
128        offsets: &Path,
129    ) -> Result<(File, File, bool), NippyJarError> {
130        let is_created = !data.exists() || !offsets.exists();
131
132        if !data.exists() {
133            // File::create is write-only (no reading possible)
134            File::create(data)?;
135        }
136
137        let mut data_file = OpenOptions::new().read(true).write(true).open(data)?;
138        data_file.seek(SeekFrom::End(0))?;
139
140        if !offsets.exists() {
141            // File::create is write-only (no reading possible)
142            File::create(offsets)?;
143        }
144
145        let mut offsets_file = OpenOptions::new().read(true).write(true).open(offsets)?;
146        if is_created {
147            let mut buf = Vec::with_capacity(1 + OFFSET_SIZE_BYTES as usize);
148
149            // First byte of the offset file is the size of one offset in bytes
150            buf.write_all(&[OFFSET_SIZE_BYTES])?;
151
152            // The last offset should always represent the data file len, which is 0 on
153            // creation.
154            buf.write_all(&[0; OFFSET_SIZE_BYTES as usize])?;
155
156            offsets_file.write_all(&buf)?;
157            offsets_file.seek(SeekFrom::End(0))?;
158        }
159
160        Ok((data_file, offsets_file, is_created))
161    }
162
163    /// Appends rows to data file.  `fn commit()` should be called to flush offsets and config to
164    /// disk.
165    ///
166    /// `column_values_per_row`: A vector where each element is a column's values in sequence,
167    /// corresponding to each row. The vector's length equals the number of columns.
168    pub fn append_rows(
169        &mut self,
170        column_values_per_row: Vec<impl IntoIterator<Item = ColumnResult<impl AsRef<[u8]>>>>,
171        num_rows: u64,
172    ) -> Result<(), NippyJarError> {
173        let mut column_iterators = column_values_per_row
174            .into_iter()
175            .map(|v| v.into_iter())
176            .collect::<Vec<_>>()
177            .into_iter();
178
179        for _ in 0..num_rows {
180            let mut iterators = Vec::with_capacity(self.jar.columns);
181
182            for mut column_iter in column_iterators {
183                self.append_column(column_iter.next())?;
184
185                iterators.push(column_iter);
186            }
187
188            column_iterators = iterators.into_iter();
189        }
190
191        Ok(())
192    }
193
194    /// Appends a column to data file. `fn commit()` should be called to flush offsets and config to
195    /// disk.
196    pub fn append_column(
197        &mut self,
198        column: Option<ColumnResult<impl AsRef<[u8]>>>,
199    ) -> Result<(), NippyJarError> {
200        self.dirty = true;
201
202        match column {
203            Some(Ok(value)) => {
204                if self.offsets.is_empty() {
205                    // Represents the offset of the soon to be appended data column
206                    self.offsets.push(self.data_file.stream_position()?);
207                }
208
209                let written = self.write_column(value.as_ref())?;
210
211                // Last offset represents the size of the data file if no more data is to be
212                // appended. Otherwise, represents the offset of the next data item.
213                self.offsets.push(self.offsets.last().expect("qed") + written as u64);
214            }
215            None => {
216                return Err(NippyJarError::UnexpectedMissingValue(
217                    self.jar.rows as u64,
218                    self.column as u64,
219                ))
220            }
221            Some(Err(err)) => return Err(err.into()),
222        }
223
224        Ok(())
225    }
226
227    /// Writes column to data file. If it's the last column of the row, call `finalize_row()`
228    fn write_column(&mut self, value: &[u8]) -> Result<usize, NippyJarError> {
229        self.uncompressed_row_size += value.len();
230        let len = if let Some(compression) = &self.jar.compressor {
231            let before = self.tmp_buf.len();
232            let len = compression.compress_to(value, &mut self.tmp_buf)?;
233            self.data_file.write_all(&self.tmp_buf[before..before + len])?;
234            len
235        } else {
236            self.data_file.write_all(value)?;
237            value.len()
238        };
239
240        self.column += 1;
241
242        if self.jar.columns == self.column {
243            self.finalize_row();
244        }
245
246        Ok(len)
247    }
248
249    /// Prunes rows from data and offsets file and updates its configuration on disk
250    pub fn prune_rows(&mut self, num_rows: usize) -> Result<(), NippyJarError> {
251        self.dirty = true;
252
253        self.offsets_file.flush()?;
254        self.data_file.flush()?;
255
256        // Each column of a row is one offset
257        let num_offsets = num_rows * self.jar.columns;
258
259        // Calculate the number of offsets to prune from in-memory list
260        let offsets_prune_count = num_offsets.min(self.offsets.len().saturating_sub(1)); // last element is the expected size of the data file
261        let remaining_to_prune = num_offsets.saturating_sub(offsets_prune_count);
262
263        // Prune in-memory offsets if needed
264        if offsets_prune_count > 0 {
265            // Determine new length based on the offset to prune up to
266            let new_len = self.offsets[(self.offsets.len() - 1) - offsets_prune_count]; // last element is the expected size of the data file
267            self.offsets.truncate(self.offsets.len() - offsets_prune_count);
268
269            // Truncate the data file to the new length
270            self.data_file.get_mut().set_len(new_len)?;
271        }
272
273        // Prune from on-disk offset list if there are still rows left to prune
274        if remaining_to_prune > 0 {
275            // Get the current length of the on-disk offset file
276            let length = self.offsets_file.get_ref().metadata()?.len();
277
278            // Handle non-empty offset file
279            if length > 1 {
280                // first byte is reserved for `bytes_per_offset`, which is 8 initially.
281                let num_offsets = (length - 1) / OFFSET_SIZE_BYTES as u64;
282
283                if remaining_to_prune as u64 > num_offsets {
284                    return Err(NippyJarError::InvalidPruning(
285                        num_offsets,
286                        remaining_to_prune as u64,
287                    ))
288                }
289
290                let new_num_offsets = num_offsets.saturating_sub(remaining_to_prune as u64);
291
292                // If all rows are to be pruned
293                if new_num_offsets <= 1 {
294                    // <= 1 because the one offset would actually be the expected file data size
295                    self.offsets_file.get_mut().set_len(1)?;
296                    self.data_file.get_mut().set_len(0)?;
297                } else {
298                    // Calculate the new length for the on-disk offset list
299                    let new_len = 1 + new_num_offsets * OFFSET_SIZE_BYTES as u64;
300                    // Seek to the position of the last offset
301                    self.offsets_file
302                        .seek(SeekFrom::Start(new_len.saturating_sub(OFFSET_SIZE_BYTES as u64)))?;
303                    // Read the last offset value
304                    let mut last_offset = [0u8; OFFSET_SIZE_BYTES as usize];
305                    self.offsets_file.get_ref().read_exact(&mut last_offset)?;
306                    let last_offset = u64::from_le_bytes(last_offset);
307
308                    // Update the lengths of both the offsets and data files
309                    self.offsets_file.get_mut().set_len(new_len)?;
310                    self.data_file.get_mut().set_len(last_offset)?;
311                }
312            } else {
313                return Err(NippyJarError::InvalidPruning(0, remaining_to_prune as u64))
314            }
315        }
316
317        self.offsets_file.get_ref().sync_all()?;
318        self.data_file.get_ref().sync_all()?;
319
320        self.offsets_file.seek(SeekFrom::End(0))?;
321        self.data_file.seek(SeekFrom::End(0))?;
322
323        self.jar.rows = self.jar.rows.saturating_sub(num_rows);
324        if self.jar.rows == 0 {
325            self.jar.max_row_size = 0;
326        }
327        self.jar.freeze_config()?;
328
329        Ok(())
330    }
331
332    /// Updates [`NippyJar`] with the new row count and maximum uncompressed row size, while
333    /// resetting internal fields.
334    fn finalize_row(&mut self) {
335        self.jar.max_row_size = self.jar.max_row_size.max(self.uncompressed_row_size);
336        self.jar.rows += 1;
337
338        self.tmp_buf.clear();
339        self.uncompressed_row_size = 0;
340        self.column = 0;
341    }
342
343    /// Commits configuration and offsets to disk. It drains the internal offset list.
344    pub fn commit(&mut self) -> Result<(), NippyJarError> {
345        self.data_file.flush()?;
346        self.data_file.get_ref().sync_all()?;
347
348        self.commit_offsets()?;
349
350        // Flushes `max_row_size` and total `rows` to disk.
351        self.jar.freeze_config()?;
352        self.dirty = false;
353
354        Ok(())
355    }
356
357    /// Commits changes to the data file and offsets without synchronizing all data to disk.
358    ///
359    /// This function flushes the buffered data to the data file and commits the offsets,
360    /// but it does not guarantee that all data is synchronized to persistent storage.
361    #[cfg(feature = "test-utils")]
362    pub fn commit_without_sync_all(&mut self) -> Result<(), NippyJarError> {
363        self.data_file.flush()?;
364
365        self.commit_offsets_without_sync_all()?;
366
367        // Flushes `max_row_size` and total `rows` to disk.
368        self.jar.freeze_config()?;
369        self.dirty = false;
370
371        Ok(())
372    }
373
374    /// Flushes offsets to disk.
375    pub(crate) fn commit_offsets(&mut self) -> Result<(), NippyJarError> {
376        self.commit_offsets_inner()?;
377        self.offsets_file.get_ref().sync_all()?;
378
379        Ok(())
380    }
381
382    #[cfg(feature = "test-utils")]
383    fn commit_offsets_without_sync_all(&mut self) -> Result<(), NippyJarError> {
384        self.commit_offsets_inner()
385    }
386
387    /// Flushes offsets to disk.
388    ///
389    /// CAUTION: Does not call `sync_all` on the offsets file and requires a manual call to
390    /// `self.offsets_file.get_ref().sync_all()`.
391    fn commit_offsets_inner(&mut self) -> Result<(), NippyJarError> {
392        // The last offset on disk can be the first offset of `self.offsets` given how
393        // `append_column()` works alongside commit. So we need to skip it.
394        let mut last_offset_ondisk = if self.offsets_file.get_ref().metadata()?.len() > 1 {
395            self.offsets_file.seek(SeekFrom::End(-(OFFSET_SIZE_BYTES as i64)))?;
396            let mut buf = [0u8; OFFSET_SIZE_BYTES as usize];
397            self.offsets_file.get_ref().read_exact(&mut buf)?;
398            Some(u64::from_le_bytes(buf))
399        } else {
400            None
401        };
402
403        self.offsets_file.seek(SeekFrom::End(0))?;
404
405        // Appends new offsets to disk
406        for offset in self.offsets.drain(..) {
407            if let Some(last_offset_ondisk) = last_offset_ondisk.take() {
408                if last_offset_ondisk == offset {
409                    continue
410                }
411            }
412            self.offsets_file.write_all(&offset.to_le_bytes())?;
413        }
414        self.offsets_file.flush()?;
415
416        Ok(())
417    }
418
419    /// Returns the maximum row size for the associated [`NippyJar`].
420    #[cfg(test)]
421    pub const fn max_row_size(&self) -> usize {
422        self.jar.max_row_size
423    }
424
425    /// Returns the column index of the current checker instance.
426    #[cfg(test)]
427    pub const fn column(&self) -> usize {
428        self.column
429    }
430
431    /// Returns a reference to the offsets vector.
432    #[cfg(test)]
433    pub fn offsets(&self) -> &[u64] {
434        &self.offsets
435    }
436
437    /// Returns a mutable reference to the offsets vector.
438    #[cfg(test)]
439    pub const fn offsets_mut(&mut self) -> &mut Vec<u64> {
440        &mut self.offsets
441    }
442
443    /// Returns the path to the offsets file for the associated [`NippyJar`].
444    #[cfg(test)]
445    pub fn offsets_path(&self) -> std::path::PathBuf {
446        self.jar.offsets_path()
447    }
448
449    /// Returns the path to the data file for the associated [`NippyJar`].
450    #[cfg(test)]
451    pub fn data_path(&self) -> &Path {
452        self.jar.data_path()
453    }
454
455    /// Returns a mutable reference to the buffered writer for the data file.
456    #[cfg(any(test, feature = "test-utils"))]
457    pub const fn data_file(&mut self) -> &mut BufWriter<File> {
458        &mut self.data_file
459    }
460
461    /// Returns a reference to the associated [`NippyJar`] instance.
462    #[cfg(any(test, feature = "test-utils"))]
463    pub const fn jar(&self) -> &NippyJar<H> {
464        &self.jar
465    }
466}