reth_nippy_jar/
writer.rs

1use crate::{
2    compression::Compression, ColumnResult, NippyJar, NippyJarChecker, NippyJarError,
3    NippyJarHeader,
4};
5use std::{
6    fs::{File, OpenOptions},
7    io::{BufWriter, Read, Seek, SeekFrom, Write},
8    path::Path,
9};
10
11/// Size of one offset in bytes.
12pub(crate) const OFFSET_SIZE_BYTES: u8 = 8;
13
14/// Writer of [`NippyJar`]. Handles table data and offsets only.
15///
16/// Table data is written directly to disk, while offsets and configuration need to be flushed by
17/// calling `commit()`.
18///
19/// ## Offset file layout
20/// The first byte is the size of a single offset in bytes, `m`.
21/// Then, the file contains `n` entries, each with a size of `m`. Each entry represents an offset,
22/// except for the last entry, which represents both the total size of the data file, as well as the
23/// next offset to write new data to.
24///
25/// ## Data file layout
26/// The data file is represented just as a sequence of bytes of data without any delimiters
27#[derive(Debug)]
28pub struct NippyJarWriter<H: NippyJarHeader = ()> {
29    /// Associated [`NippyJar`], containing all necessary configurations for data
30    /// handling.
31    jar: NippyJar<H>,
32    /// File handle to where the data is stored.
33    data_file: BufWriter<File>,
34    /// File handle to where the offsets are stored.
35    offsets_file: BufWriter<File>,
36    /// Temporary buffer to reuse when compressing data.
37    tmp_buf: Vec<u8>,
38    /// Used to find the maximum uncompressed size of a row in a jar.
39    uncompressed_row_size: usize,
40    /// Partial offset list which hasn't been flushed to disk.
41    offsets: Vec<u64>,
42    /// Column where writer is going to write next.
43    column: usize,
44    /// Whether the writer has changed data that needs to be committed.
45    dirty: bool,
46}
47
48impl<H: NippyJarHeader> NippyJarWriter<H> {
49    /// Creates a [`NippyJarWriter`] from [`NippyJar`].
50    ///
51    /// If will **always** attempt to heal any inconsistent state when called.
52    pub fn new(jar: NippyJar<H>) -> Result<Self, NippyJarError> {
53        let (data_file, offsets_file, is_created) =
54            Self::create_or_open_files(jar.data_path(), &jar.offsets_path())?;
55
56        let (jar, data_file, offsets_file) = if is_created {
57            // Makes sure we don't have dangling data and offset files when we just created the file
58            jar.freeze_config()?;
59
60            (jar, BufWriter::new(data_file), BufWriter::new(offsets_file))
61        } else {
62            // If we are opening a previously created jar, we need to check its consistency, and
63            // make changes if necessary.
64            let mut checker = NippyJarChecker::new(jar);
65            checker.ensure_consistency()?;
66
67            let NippyJarChecker { jar, data_file, offsets_file } = checker;
68
69            // Calling ensure_consistency, will fill data_file and offsets_file
70            (jar, data_file.expect("qed"), offsets_file.expect("qed"))
71        };
72
73        let mut writer = Self {
74            jar,
75            data_file,
76            offsets_file,
77            tmp_buf: Vec::with_capacity(1_000_000),
78            uncompressed_row_size: 0,
79            offsets: Vec::with_capacity(1_000_000),
80            column: 0,
81            dirty: false,
82        };
83
84        if !is_created {
85            // Commit any potential heals done above.
86            writer.commit()?;
87        }
88
89        Ok(writer)
90    }
91
92    /// Returns a reference to `H` of [`NippyJar`]
93    pub const fn user_header(&self) -> &H {
94        &self.jar.user_header
95    }
96
97    /// Returns a mutable reference to `H` of [`NippyJar`].
98    ///
99    /// Since there's no way of knowing if `H` has been actually changed, this sets `self.dirty` to
100    /// true.
101    pub const fn user_header_mut(&mut self) -> &mut H {
102        self.dirty = true;
103        &mut self.jar.user_header
104    }
105
106    /// Returns whether there are changes that need to be committed.
107    pub const fn is_dirty(&self) -> bool {
108        self.dirty
109    }
110
111    /// Sets writer as dirty.
112    pub const fn set_dirty(&mut self) {
113        self.dirty = true
114    }
115
116    /// Gets total writer rows in jar.
117    pub const fn rows(&self) -> usize {
118        self.jar.rows()
119    }
120
121    /// Consumes the writer and returns the associated [`NippyJar`].
122    pub fn into_jar(self) -> NippyJar<H> {
123        self.jar
124    }
125
126    fn create_or_open_files(
127        data: &Path,
128        offsets: &Path,
129    ) -> Result<(File, File, bool), NippyJarError> {
130        let is_created = !data.exists() || !offsets.exists();
131
132        if !data.exists() {
133            // File::create is write-only (no reading possible)
134            File::create(data)?;
135        }
136
137        let mut data_file = OpenOptions::new().read(true).write(true).open(data)?;
138        data_file.seek(SeekFrom::End(0))?;
139
140        if !offsets.exists() {
141            // File::create is write-only (no reading possible)
142            File::create(offsets)?;
143        }
144
145        let mut offsets_file = OpenOptions::new().read(true).write(true).open(offsets)?;
146        if is_created {
147            let mut buf = Vec::with_capacity(1 + OFFSET_SIZE_BYTES as usize);
148
149            // First byte of the offset file is the size of one offset in bytes
150            buf.write_all(&[OFFSET_SIZE_BYTES])?;
151
152            // The last offset should always represent the data file len, which is 0 on
153            // creation.
154            buf.write_all(&[0; OFFSET_SIZE_BYTES as usize])?;
155
156            offsets_file.write_all(&buf)?;
157            offsets_file.seek(SeekFrom::End(0))?;
158        }
159
160        Ok((data_file, offsets_file, is_created))
161    }
162
163    /// Appends rows to data file.  `fn commit()` should be called to flush offsets and config to
164    /// disk.
165    ///
166    /// `column_values_per_row`: A vector where each element is a column's values in sequence,
167    /// corresponding to each row. The vector's length equals the number of columns.
168    pub fn append_rows(
169        &mut self,
170        column_values_per_row: Vec<impl IntoIterator<Item = ColumnResult<impl AsRef<[u8]>>>>,
171        num_rows: u64,
172    ) -> Result<(), NippyJarError> {
173        let mut column_iterators = column_values_per_row
174            .into_iter()
175            .map(|v| v.into_iter())
176            .collect::<Vec<_>>()
177            .into_iter();
178
179        for _ in 0..num_rows {
180            let mut iterators = Vec::with_capacity(self.jar.columns);
181
182            for mut column_iter in column_iterators {
183                self.append_column(column_iter.next())?;
184
185                iterators.push(column_iter);
186            }
187
188            column_iterators = iterators.into_iter();
189        }
190
191        Ok(())
192    }
193
194    /// Appends a column to data file. `fn commit()` should be called to flush offsets and config to
195    /// disk.
196    pub fn append_column(
197        &mut self,
198        column: Option<ColumnResult<impl AsRef<[u8]>>>,
199    ) -> Result<(), NippyJarError> {
200        self.dirty = true;
201
202        match column {
203            Some(Ok(value)) => {
204                if self.offsets.is_empty() {
205                    // Represents the offset of the soon to be appended data column
206                    self.offsets.push(self.data_file.stream_position()?);
207                }
208
209                let written = self.write_column(value.as_ref())?;
210
211                // Last offset represents the size of the data file if no more data is to be
212                // appended. Otherwise, represents the offset of the next data item.
213                self.offsets.push(self.offsets.last().expect("qed") + written as u64);
214            }
215            None => {
216                return Err(NippyJarError::UnexpectedMissingValue(
217                    self.jar.rows as u64,
218                    self.column as u64,
219                ))
220            }
221            Some(Err(err)) => return Err(err.into()),
222        }
223
224        Ok(())
225    }
226
227    /// Writes column to data file. If it's the last column of the row, call `finalize_row()`
228    fn write_column(&mut self, value: &[u8]) -> Result<usize, NippyJarError> {
229        self.uncompressed_row_size += value.len();
230        let len = if let Some(compression) = &self.jar.compressor {
231            let before = self.tmp_buf.len();
232            let len = compression.compress_to(value, &mut self.tmp_buf)?;
233            self.data_file.write_all(&self.tmp_buf[before..before + len])?;
234            len
235        } else {
236            self.data_file.write_all(value)?;
237            value.len()
238        };
239
240        self.column += 1;
241
242        if self.jar.columns == self.column {
243            self.finalize_row();
244        }
245
246        Ok(len)
247    }
248
249    /// Prunes rows from data and offsets file and updates its configuration on disk
250    pub fn prune_rows(&mut self, num_rows: usize) -> Result<(), NippyJarError> {
251        self.dirty = true;
252
253        self.offsets_file.flush()?;
254        self.data_file.flush()?;
255
256        // Each column of a row is one offset
257        let num_offsets = num_rows * self.jar.columns;
258
259        // Calculate the number of offsets to prune from in-memory list
260        let offsets_prune_count = num_offsets.min(self.offsets.len().saturating_sub(1)); // last element is the expected size of the data file
261        let remaining_to_prune = num_offsets.saturating_sub(offsets_prune_count);
262
263        // Prune in-memory offsets if needed
264        if offsets_prune_count > 0 {
265            // Determine new length based on the offset to prune up to
266            let new_len = self.offsets[(self.offsets.len() - 1) - offsets_prune_count]; // last element is the expected size of the data file
267            self.offsets.truncate(self.offsets.len() - offsets_prune_count);
268
269            // Truncate the data file to the new length
270            self.data_file.get_mut().set_len(new_len)?;
271        }
272
273        // Prune from on-disk offset list if there are still rows left to prune
274        if remaining_to_prune > 0 {
275            // Get the current length of the on-disk offset file
276            let length = self.offsets_file.get_ref().metadata()?.len();
277
278            // Handle non-empty offset file
279            if length > 1 {
280                // first byte is reserved for `bytes_per_offset`, which is 8 initially.
281                let num_offsets = (length - 1) / OFFSET_SIZE_BYTES as u64;
282
283                if remaining_to_prune as u64 > num_offsets {
284                    return Err(NippyJarError::InvalidPruning(
285                        num_offsets,
286                        remaining_to_prune as u64,
287                    ))
288                }
289
290                let new_num_offsets = num_offsets.saturating_sub(remaining_to_prune as u64);
291
292                // If all rows are to be pruned
293                if new_num_offsets <= 1 {
294                    // <= 1 because the one offset would actually be the expected file data size
295                    //
296                    // When no rows remain, keep the offset size byte and the final offset (data
297                    // file size = 0). This maintains the same structure as when
298                    // a file is initially created.
299                    // See `NippyJarWriter::create_or_open_files` for the initial file format.
300                    self.offsets_file.get_mut().set_len(1 + OFFSET_SIZE_BYTES as u64)?;
301                    self.data_file.get_mut().set_len(0)?;
302                } else {
303                    // Calculate the new length for the on-disk offset list
304                    let new_len = 1 + new_num_offsets * OFFSET_SIZE_BYTES as u64;
305                    // Seek to the position of the last offset
306                    self.offsets_file
307                        .seek(SeekFrom::Start(new_len.saturating_sub(OFFSET_SIZE_BYTES as u64)))?;
308                    // Read the last offset value
309                    let mut last_offset = [0u8; OFFSET_SIZE_BYTES as usize];
310                    self.offsets_file.get_ref().read_exact(&mut last_offset)?;
311                    let last_offset = u64::from_le_bytes(last_offset);
312
313                    // Update the lengths of both the offsets and data files
314                    self.offsets_file.get_mut().set_len(new_len)?;
315                    self.data_file.get_mut().set_len(last_offset)?;
316                }
317            } else {
318                return Err(NippyJarError::InvalidPruning(0, remaining_to_prune as u64))
319            }
320        }
321
322        self.offsets_file.get_ref().sync_all()?;
323        self.data_file.get_ref().sync_all()?;
324
325        self.offsets_file.seek(SeekFrom::End(0))?;
326        self.data_file.seek(SeekFrom::End(0))?;
327
328        self.jar.rows = self.jar.rows.saturating_sub(num_rows);
329        if self.jar.rows == 0 {
330            self.jar.max_row_size = 0;
331        }
332        self.jar.freeze_config()?;
333
334        Ok(())
335    }
336
337    /// Updates [`NippyJar`] with the new row count and maximum uncompressed row size, while
338    /// resetting internal fields.
339    fn finalize_row(&mut self) {
340        self.jar.max_row_size = self.jar.max_row_size.max(self.uncompressed_row_size);
341        self.jar.rows += 1;
342
343        self.tmp_buf.clear();
344        self.uncompressed_row_size = 0;
345        self.column = 0;
346    }
347
348    /// Commits configuration and offsets to disk. It drains the internal offset list.
349    pub fn commit(&mut self) -> Result<(), NippyJarError> {
350        self.data_file.flush()?;
351        self.data_file.get_ref().sync_all()?;
352
353        self.commit_offsets()?;
354
355        // Flushes `max_row_size` and total `rows` to disk.
356        self.jar.freeze_config()?;
357        self.dirty = false;
358
359        Ok(())
360    }
361
362    /// Commits changes to the data file and offsets without synchronizing all data to disk.
363    ///
364    /// This function flushes the buffered data to the data file and commits the offsets,
365    /// but it does not guarantee that all data is synchronized to persistent storage.
366    #[cfg(feature = "test-utils")]
367    pub fn commit_without_sync_all(&mut self) -> Result<(), NippyJarError> {
368        self.data_file.flush()?;
369
370        self.commit_offsets_without_sync_all()?;
371
372        // Flushes `max_row_size` and total `rows` to disk.
373        self.jar.freeze_config()?;
374        self.dirty = false;
375
376        Ok(())
377    }
378
379    /// Flushes offsets to disk.
380    pub(crate) fn commit_offsets(&mut self) -> Result<(), NippyJarError> {
381        self.commit_offsets_inner()?;
382        self.offsets_file.get_ref().sync_all()?;
383
384        Ok(())
385    }
386
387    #[cfg(feature = "test-utils")]
388    fn commit_offsets_without_sync_all(&mut self) -> Result<(), NippyJarError> {
389        self.commit_offsets_inner()
390    }
391
392    /// Flushes offsets to disk.
393    ///
394    /// CAUTION: Does not call `sync_all` on the offsets file and requires a manual call to
395    /// `self.offsets_file.get_ref().sync_all()`.
396    fn commit_offsets_inner(&mut self) -> Result<(), NippyJarError> {
397        // The last offset on disk can be the first offset of `self.offsets` given how
398        // `append_column()` works alongside commit. So we need to skip it.
399        let mut last_offset_ondisk = if self.offsets_file.get_ref().metadata()?.len() > 1 {
400            self.offsets_file.seek(SeekFrom::End(-(OFFSET_SIZE_BYTES as i64)))?;
401            let mut buf = [0u8; OFFSET_SIZE_BYTES as usize];
402            self.offsets_file.get_ref().read_exact(&mut buf)?;
403            Some(u64::from_le_bytes(buf))
404        } else {
405            None
406        };
407
408        self.offsets_file.seek(SeekFrom::End(0))?;
409
410        // Appends new offsets to disk
411        for offset in self.offsets.drain(..) {
412            if let Some(last_offset_ondisk) = last_offset_ondisk.take() &&
413                last_offset_ondisk == offset
414            {
415                continue
416            }
417            self.offsets_file.write_all(&offset.to_le_bytes())?;
418        }
419        self.offsets_file.flush()?;
420
421        Ok(())
422    }
423
424    /// Returns the maximum row size for the associated [`NippyJar`].
425    #[cfg(test)]
426    pub const fn max_row_size(&self) -> usize {
427        self.jar.max_row_size
428    }
429
430    /// Returns the column index of the current checker instance.
431    #[cfg(test)]
432    pub const fn column(&self) -> usize {
433        self.column
434    }
435
436    /// Returns a reference to the offsets vector.
437    #[cfg(test)]
438    pub fn offsets(&self) -> &[u64] {
439        &self.offsets
440    }
441
442    /// Returns a mutable reference to the offsets vector.
443    #[cfg(test)]
444    pub const fn offsets_mut(&mut self) -> &mut Vec<u64> {
445        &mut self.offsets
446    }
447
448    /// Returns the path to the offsets file for the associated [`NippyJar`].
449    #[cfg(test)]
450    pub fn offsets_path(&self) -> std::path::PathBuf {
451        self.jar.offsets_path()
452    }
453
454    /// Returns the path to the data file for the associated [`NippyJar`].
455    #[cfg(test)]
456    pub fn data_path(&self) -> &Path {
457        self.jar.data_path()
458    }
459
460    /// Returns a mutable reference to the buffered writer for the data file.
461    #[cfg(any(test, feature = "test-utils"))]
462    pub const fn data_file(&mut self) -> &mut BufWriter<File> {
463        &mut self.data_file
464    }
465
466    /// Returns a reference to the associated [`NippyJar`] instance.
467    #[cfg(any(test, feature = "test-utils"))]
468    pub const fn jar(&self) -> &NippyJar<H> {
469        &self.jar
470    }
471}