reth_nippy_jar/
consistency.rs

1use crate::{writer::OFFSET_SIZE_BYTES, NippyJar, NippyJarError, NippyJarHeader};
2use std::{
3    cmp::Ordering,
4    fs::{File, OpenOptions},
5    io::{BufWriter, Seek, SeekFrom},
6    path::Path,
7};
8
9/// Performs consistency checks or heals on the [`NippyJar`] file
10/// * Is the offsets file size expected?
11/// * Is the data file size expected?
12///
13/// This is based on the assumption that [`NippyJar`] configuration is **always** the last one
14/// to be updated when something is written, as by the `NippyJarWriter::commit()` function shows.
15///
16/// **For checks (read-only) use `check_consistency` method.**
17///
18/// **For heals (read-write) use `ensure_consistency` method.**
19#[derive(Debug)]
20pub struct NippyJarChecker<H: NippyJarHeader = ()> {
21    /// Associated [`NippyJar`], containing all necessary configurations for data
22    /// handling.
23    pub(crate) jar: NippyJar<H>,
24    /// File handle to where the data is stored.
25    pub(crate) data_file: Option<BufWriter<File>>,
26    /// File handle to where the offsets are stored.
27    pub(crate) offsets_file: Option<BufWriter<File>>,
28}
29
30impl<H: NippyJarHeader> NippyJarChecker<H> {
31    /// Creates a new instance of [`NippyJarChecker`] with the provided [`NippyJar`].
32    ///
33    /// This method initializes the checker without any associated file handles for
34    /// the data or offsets files. The [`NippyJar`] passed in contains all necessary
35    /// configurations for handling data.
36    pub const fn new(jar: NippyJar<H>) -> Self {
37        Self { jar, data_file: None, offsets_file: None }
38    }
39
40    /// It will throw an error if the [`NippyJar`] is in a inconsistent state.
41    pub fn check_consistency(&mut self) -> Result<(), NippyJarError> {
42        self.handle_consistency(ConsistencyFailStrategy::ThrowError)
43    }
44
45    /// It will attempt to heal if the [`NippyJar`] is in a inconsistent state.
46    ///
47    /// **ATTENTION**: disk commit should be handled externally by consuming `Self`
48    pub fn ensure_consistency(&mut self) -> Result<(), NippyJarError> {
49        self.handle_consistency(ConsistencyFailStrategy::Heal)
50    }
51
52    fn handle_consistency(&mut self, mode: ConsistencyFailStrategy) -> Result<(), NippyJarError> {
53        self.load_files(mode)?;
54        let mut reader = self.jar.open_data_reader()?;
55
56        // When an offset size is smaller than the initial (8), we are dealing with immutable
57        // data.
58        if reader.offset_size() != OFFSET_SIZE_BYTES {
59            return Err(NippyJarError::FrozenJar)
60        }
61
62        let expected_offsets_file_size: u64 = (1 + // first byte is the size of one offset
63                OFFSET_SIZE_BYTES as usize* self.jar.rows * self.jar.columns + // `offset size * num rows * num columns`
64                OFFSET_SIZE_BYTES as usize) as u64; // expected size of the data file
65        let actual_offsets_file_size = self.offsets_file().get_ref().metadata()?.len();
66
67        if mode.should_err() &&
68            expected_offsets_file_size.cmp(&actual_offsets_file_size) != Ordering::Equal
69        {
70            return Err(NippyJarError::InconsistentState)
71        }
72
73        // Offsets configuration wasn't properly committed
74        match expected_offsets_file_size.cmp(&actual_offsets_file_size) {
75            Ordering::Less => {
76                // Happened during an appending job
77                // TODO: ideally we could truncate until the last offset of the last column of the
78                //  last row inserted
79
80                // Windows has locked the file with the mmap handle, so we need to drop it
81                drop(reader);
82
83                self.offsets_file().get_mut().set_len(expected_offsets_file_size)?;
84                reader = self.jar.open_data_reader()?;
85            }
86            Ordering::Greater => {
87                // Happened during a pruning job
88                // `num rows = (file size - 1 - size of one offset) / num columns`
89                self.jar.rows = ((actual_offsets_file_size.
90                        saturating_sub(1). // first byte is the size of one offset
91                        saturating_sub(OFFSET_SIZE_BYTES as u64) / // expected size of the data file
92                        (self.jar.columns as u64)) /
93                    OFFSET_SIZE_BYTES as u64) as usize;
94
95                // Freeze row count changed
96                self.jar.freeze_config()?;
97            }
98            Ordering::Equal => {}
99        }
100
101        // last offset should match the data_file_len
102        let last_offset = reader.reverse_offset(0)?;
103        let data_file_len = self.data_file().get_ref().metadata()?.len();
104
105        if mode.should_err() && last_offset.cmp(&data_file_len) != Ordering::Equal {
106            return Err(NippyJarError::InconsistentState)
107        }
108
109        // Offset list wasn't properly committed
110        match last_offset.cmp(&data_file_len) {
111            Ordering::Less => {
112                // Windows has locked the file with the mmap handle, so we need to drop it
113                drop(reader);
114
115                // Happened during an appending job, so we need to truncate the data, since there's
116                // no way to recover it.
117                self.data_file().get_mut().set_len(last_offset)?;
118            }
119            Ordering::Greater => {
120                // Happened during a pruning job, so we need to reverse iterate offsets until we
121                // find the matching one.
122                for index in 0..reader.offsets_count()? {
123                    let offset = reader.reverse_offset(index + 1)?;
124                    // It would only be equal if the previous row was fully pruned.
125                    if offset <= data_file_len {
126                        let new_len = self
127                            .offsets_file()
128                            .get_ref()
129                            .metadata()?
130                            .len()
131                            .saturating_sub(OFFSET_SIZE_BYTES as u64 * (index as u64 + 1));
132
133                        // Windows has locked the file with the mmap handle, so we need to drop it
134                        drop(reader);
135
136                        self.offsets_file().get_mut().set_len(new_len)?;
137
138                        // Since we decrease the offset list, we need to check the consistency of
139                        // `self.jar.rows` again
140                        self.handle_consistency(ConsistencyFailStrategy::Heal)?;
141                        break
142                    }
143                }
144            }
145            Ordering::Equal => {}
146        }
147
148        self.offsets_file().seek(SeekFrom::End(0))?;
149        self.data_file().seek(SeekFrom::End(0))?;
150
151        Ok(())
152    }
153
154    /// Loads data and offsets files.
155    fn load_files(&mut self, mode: ConsistencyFailStrategy) -> Result<(), NippyJarError> {
156        let load_file = |path: &Path| -> Result<BufWriter<File>, NippyJarError> {
157            let path = path
158                .exists()
159                .then_some(path)
160                .ok_or_else(|| NippyJarError::MissingFile(path.to_path_buf()))?;
161            Ok(BufWriter::new(OpenOptions::new().read(true).write(mode.should_heal()).open(path)?))
162        };
163        self.data_file = Some(load_file(self.jar.data_path())?);
164        self.offsets_file = Some(load_file(&self.jar.offsets_path())?);
165        Ok(())
166    }
167
168    /// Returns a mutable reference to offsets file.
169    ///
170    /// **Panics** if it does not exist.
171    const fn offsets_file(&mut self) -> &mut BufWriter<File> {
172        self.offsets_file.as_mut().expect("should exist")
173    }
174
175    /// Returns a mutable reference to data file.
176    ///
177    /// **Panics** if it does not exist.
178    const fn data_file(&mut self) -> &mut BufWriter<File> {
179        self.data_file.as_mut().expect("should exist")
180    }
181}
182
183/// Strategy on encountering an inconsistent state on [`NippyJarChecker`].
184#[derive(Debug, Copy, Clone)]
185enum ConsistencyFailStrategy {
186    /// Writer should heal.
187    Heal,
188    /// Writer should throw an error.
189    ThrowError,
190}
191
192impl ConsistencyFailStrategy {
193    /// Whether writer should heal.
194    const fn should_heal(&self) -> bool {
195        matches!(self, Self::Heal)
196    }
197
198    /// Whether writer should throw an error.
199    const fn should_err(&self) -> bool {
200        matches!(self, Self::ThrowError)
201    }
202}