reth_nippy_jar/consistency.rs
1use crate::{writer::OFFSET_SIZE_BYTES, NippyJar, NippyJarError, NippyJarHeader};
2use std::{
3 cmp::Ordering,
4 fs::{File, OpenOptions},
5 io::{BufWriter, Seek, SeekFrom},
6 path::Path,
7};
8
9/// Performs consistency checks or heals on the [`NippyJar`] file
10/// * Is the offsets file size expected?
11/// * Is the data file size expected?
12///
13/// This is based on the assumption that [`NippyJar`] configuration is **always** the last one
14/// to be updated when something is written, as by the `NippyJarWriter::commit()` function shows.
15///
16/// **For checks (read-only) use `check_consistency` method.**
17///
18/// **For heals (read-write) use `ensure_consistency` method.**
19#[derive(Debug)]
20pub struct NippyJarChecker<H: NippyJarHeader = ()> {
21 /// Associated [`NippyJar`], containing all necessary configurations for data
22 /// handling.
23 pub(crate) jar: NippyJar<H>,
24 /// File handle to where the data is stored.
25 pub(crate) data_file: Option<BufWriter<File>>,
26 /// File handle to where the offsets are stored.
27 pub(crate) offsets_file: Option<BufWriter<File>>,
28}
29
30impl<H: NippyJarHeader> NippyJarChecker<H> {
31 /// Creates a new instance of [`NippyJarChecker`] with the provided [`NippyJar`].
32 ///
33 /// This method initializes the checker without any associated file handles for
34 /// the data or offsets files. The [`NippyJar`] passed in contains all necessary
35 /// configurations for handling data.
36 pub const fn new(jar: NippyJar<H>) -> Self {
37 Self { jar, data_file: None, offsets_file: None }
38 }
39
40 /// It will throw an error if the [`NippyJar`] is in a inconsistent state.
41 pub fn check_consistency(&mut self) -> Result<(), NippyJarError> {
42 self.handle_consistency(ConsistencyFailStrategy::ThrowError)
43 }
44
45 /// It will attempt to heal if the [`NippyJar`] is in a inconsistent state.
46 ///
47 /// **ATTENTION**: disk commit should be handled externally by consuming `Self`
48 pub fn ensure_consistency(&mut self) -> Result<(), NippyJarError> {
49 self.handle_consistency(ConsistencyFailStrategy::Heal)
50 }
51
52 fn handle_consistency(&mut self, mode: ConsistencyFailStrategy) -> Result<(), NippyJarError> {
53 self.load_files(mode)?;
54 let mut reader = self.jar.open_data_reader()?;
55
56 // When an offset size is smaller than the initial (8), we are dealing with immutable
57 // data.
58 if reader.offset_size() != OFFSET_SIZE_BYTES {
59 return Err(NippyJarError::FrozenJar)
60 }
61
62 let expected_offsets_file_size: u64 = (1 + // first byte is the size of one offset
63 OFFSET_SIZE_BYTES as usize* self.jar.rows * self.jar.columns + // `offset size * num rows * num columns`
64 OFFSET_SIZE_BYTES as usize) as u64; // expected size of the data file
65 let actual_offsets_file_size = self.offsets_file().get_ref().metadata()?.len();
66
67 if mode.should_err() &&
68 expected_offsets_file_size.cmp(&actual_offsets_file_size) != Ordering::Equal
69 {
70 return Err(NippyJarError::InconsistentState)
71 }
72
73 // Offsets configuration wasn't properly committed
74 match expected_offsets_file_size.cmp(&actual_offsets_file_size) {
75 Ordering::Less => {
76 // Happened during an appending job
77 // TODO: ideally we could truncate until the last offset of the last column of the
78 // last row inserted
79
80 // Windows has locked the file with the mmap handle, so we need to drop it
81 drop(reader);
82
83 self.offsets_file().get_mut().set_len(expected_offsets_file_size)?;
84 reader = self.jar.open_data_reader()?;
85 }
86 Ordering::Greater => {
87 // Happened during a pruning job
88 // `num rows = (file size - 1 - size of one offset) / num columns`
89 self.jar.rows = ((actual_offsets_file_size.
90 saturating_sub(1). // first byte is the size of one offset
91 saturating_sub(OFFSET_SIZE_BYTES as u64) / // expected size of the data file
92 (self.jar.columns as u64)) /
93 OFFSET_SIZE_BYTES as u64) as usize;
94
95 // Freeze row count changed
96 self.jar.freeze_config()?;
97 }
98 Ordering::Equal => {}
99 }
100
101 // last offset should match the data_file_len
102 let last_offset = reader.reverse_offset(0)?;
103 let data_file_len = self.data_file().get_ref().metadata()?.len();
104
105 if mode.should_err() && last_offset.cmp(&data_file_len) != Ordering::Equal {
106 return Err(NippyJarError::InconsistentState)
107 }
108
109 // Offset list wasn't properly committed
110 match last_offset.cmp(&data_file_len) {
111 Ordering::Less => {
112 // Windows has locked the file with the mmap handle, so we need to drop it
113 drop(reader);
114
115 // Happened during an appending job, so we need to truncate the data, since there's
116 // no way to recover it.
117 self.data_file().get_mut().set_len(last_offset)?;
118 }
119 Ordering::Greater => {
120 // Happened during a pruning job, so we need to reverse iterate offsets until we
121 // find the matching one.
122 for index in 0..reader.offsets_count()? {
123 let offset = reader.reverse_offset(index + 1)?;
124 // It would only be equal if the previous row was fully pruned.
125 if offset <= data_file_len {
126 let new_len = self
127 .offsets_file()
128 .get_ref()
129 .metadata()?
130 .len()
131 .saturating_sub(OFFSET_SIZE_BYTES as u64 * (index as u64 + 1));
132
133 // Windows has locked the file with the mmap handle, so we need to drop it
134 drop(reader);
135
136 self.offsets_file().get_mut().set_len(new_len)?;
137
138 // Since we decrease the offset list, we need to check the consistency of
139 // `self.jar.rows` again
140 self.handle_consistency(ConsistencyFailStrategy::Heal)?;
141 break
142 }
143 }
144 }
145 Ordering::Equal => {}
146 }
147
148 self.offsets_file().seek(SeekFrom::End(0))?;
149 self.data_file().seek(SeekFrom::End(0))?;
150
151 Ok(())
152 }
153
154 /// Loads data and offsets files.
155 fn load_files(&mut self, mode: ConsistencyFailStrategy) -> Result<(), NippyJarError> {
156 let load_file = |path: &Path| -> Result<BufWriter<File>, NippyJarError> {
157 let path = path
158 .exists()
159 .then_some(path)
160 .ok_or_else(|| NippyJarError::MissingFile(path.to_path_buf()))?;
161 Ok(BufWriter::new(OpenOptions::new().read(true).write(mode.should_heal()).open(path)?))
162 };
163 self.data_file = Some(load_file(self.jar.data_path())?);
164 self.offsets_file = Some(load_file(&self.jar.offsets_path())?);
165 Ok(())
166 }
167
168 /// Returns a mutable reference to offsets file.
169 ///
170 /// **Panics** if it does not exist.
171 const fn offsets_file(&mut self) -> &mut BufWriter<File> {
172 self.offsets_file.as_mut().expect("should exist")
173 }
174
175 /// Returns a mutable reference to data file.
176 ///
177 /// **Panics** if it does not exist.
178 const fn data_file(&mut self) -> &mut BufWriter<File> {
179 self.data_file.as_mut().expect("should exist")
180 }
181}
182
183/// Strategy on encountering an inconsistent state on [`NippyJarChecker`].
184#[derive(Debug, Copy, Clone)]
185enum ConsistencyFailStrategy {
186 /// Writer should heal.
187 Heal,
188 /// Writer should throw an error.
189 ThrowError,
190}
191
192impl ConsistencyFailStrategy {
193 /// Whether writer should heal.
194 const fn should_heal(&self) -> bool {
195 matches!(self, Self::Heal)
196 }
197
198 /// Whether writer should throw an error.
199 const fn should_err(&self) -> bool {
200 matches!(self, Self::ThrowError)
201 }
202}