1use crate::ChangesetOffset;
7use std::{
8 fs::{File, OpenOptions},
9 io::{self, Write},
10 os::unix::fs::FileExt,
11 path::Path,
12};
13
14#[derive(Debug)]
16pub struct ChangesetOffsetWriter {
17 file: File,
18 records_written: u64,
20}
21
22impl ChangesetOffsetWriter {
23 const RECORD_SIZE: usize = 16;
25
26 pub fn new(path: impl AsRef<Path>, committed_len: u64) -> io::Result<Self> {
36 let file = OpenOptions::new()
37 .create(true)
38 .truncate(false)
39 .read(true)
40 .write(true)
41 .open(path.as_ref())?;
42
43 let file_len = file.metadata()?.len();
44 let remainder = file_len % Self::RECORD_SIZE as u64;
45
46 let aligned_len = if remainder != 0 {
48 let truncated_len = file_len - remainder;
49 tracing::warn!(
50 target: "reth::static_file",
51 path = %path.as_ref().display(),
52 original_len = file_len,
53 truncated_len,
54 "Truncating partial changeset offset record"
55 );
56 file.set_len(truncated_len)?;
57 file.sync_all()?; truncated_len
59 } else {
60 file_len
61 };
62
63 let records_in_file = aligned_len / Self::RECORD_SIZE as u64;
64
65 match records_in_file.cmp(&committed_len) {
67 std::cmp::Ordering::Greater => {
68 let target_len = committed_len * Self::RECORD_SIZE as u64;
70 tracing::warn!(
71 target: "reth::static_file",
72 path = %path.as_ref().display(),
73 sidecar_records = records_in_file,
74 committed_len,
75 "Truncating uncommitted changeset offset records after crash recovery"
76 );
77 file.set_len(target_len)?;
78 file.sync_all()?; }
80 std::cmp::Ordering::Less => {
81 return Err(io::Error::new(
93 io::ErrorKind::InvalidData,
94 format!(
95 "INVARIANT VIOLATION: Changeset offset sidecar has {} records but header expects {} \
96 (healing should have prevented this - possible bug in healing logic): {}",
97 records_in_file,
98 committed_len,
99 path.as_ref().display()
100 ),
101 ));
102 }
103 std::cmp::Ordering::Equal => {}
104 }
105
106 let records_written = committed_len;
107 let file = OpenOptions::new().create(true).append(true).open(path)?;
108
109 Ok(Self { file, records_written })
110 }
111
112 pub fn append(&mut self, offset: &ChangesetOffset) -> io::Result<()> {
114 let mut buf = [0u8; Self::RECORD_SIZE];
115 buf[..8].copy_from_slice(&offset.offset().to_le_bytes());
116 buf[8..].copy_from_slice(&offset.num_changes().to_le_bytes());
117 self.file.write_all(&buf)?;
118 self.records_written += 1;
119 Ok(())
120 }
121
122 pub fn append_many(&mut self, offsets: &[ChangesetOffset]) -> io::Result<()> {
124 for offset in offsets {
125 self.append(offset)?;
126 }
127 Ok(())
128 }
129
130 pub fn sync(&mut self) -> io::Result<()> {
132 self.file.sync_all()
133 }
134
135 pub fn truncate(&mut self, len: u64) -> io::Result<()> {
141 self.file.set_len(len * Self::RECORD_SIZE as u64)?;
142 self.file.sync_all()?;
143 self.records_written = len;
144 Ok(())
145 }
146
147 pub const fn len(&self) -> u64 {
149 self.records_written
150 }
151
152 pub const fn is_empty(&self) -> bool {
154 self.records_written == 0
155 }
156}
157
158#[derive(Debug)]
160pub struct ChangesetOffsetReader {
161 file: File,
162 len: u64,
164}
165
166impl ChangesetOffsetReader {
167 const RECORD_SIZE: usize = 16;
169
170 pub fn new(path: impl AsRef<Path>, len: u64) -> io::Result<Self> {
175 let file = File::open(path)?;
176 Ok(Self { file, len })
177 }
178
179 pub fn get(&self, block_index: u64) -> io::Result<Option<ChangesetOffset>> {
182 if block_index >= self.len {
183 return Ok(None);
184 }
185
186 let byte_pos = block_index * Self::RECORD_SIZE as u64;
187 let mut buf = [0u8; Self::RECORD_SIZE];
188 self.file.read_exact_at(&mut buf, byte_pos)?;
189
190 let offset = u64::from_le_bytes(buf[..8].try_into().unwrap());
191 let num_changes = u64::from_le_bytes(buf[8..].try_into().unwrap());
192
193 Ok(Some(ChangesetOffset::new(offset, num_changes)))
194 }
195
196 pub fn get_range(&self, start: u64, end: u64) -> io::Result<Vec<ChangesetOffset>> {
198 let end = end.min(self.len);
199 if start >= end {
200 return Ok(Vec::new());
201 }
202
203 let count = (end - start) as usize;
204 let byte_pos = start * Self::RECORD_SIZE as u64;
205
206 let mut result = Vec::with_capacity(count);
207 let mut buf = [0u8; Self::RECORD_SIZE];
208
209 for i in 0..count {
210 let pos = byte_pos + (i as u64) * Self::RECORD_SIZE as u64;
211 self.file.read_exact_at(&mut buf, pos)?;
212 let offset = u64::from_le_bytes(buf[..8].try_into().unwrap());
213 let num_changes = u64::from_le_bytes(buf[8..].try_into().unwrap());
214 result.push(ChangesetOffset::new(offset, num_changes));
215 }
216
217 Ok(result)
218 }
219
220 pub const fn len(&self) -> u64 {
222 self.len
223 }
224
225 pub const fn is_empty(&self) -> bool {
227 self.len == 0
228 }
229}
230
231#[cfg(test)]
232mod tests {
233 use super::*;
234 use tempfile::tempdir;
235
236 #[test]
237 fn test_write_and_read() {
238 let dir = tempdir().unwrap();
239 let path = dir.path().join("test.csoff");
240
241 {
243 let mut writer = ChangesetOffsetWriter::new(&path, 0).unwrap();
244 writer.append(&ChangesetOffset::new(0, 5)).unwrap();
245 writer.append(&ChangesetOffset::new(5, 3)).unwrap();
246 writer.append(&ChangesetOffset::new(8, 10)).unwrap();
247 writer.sync().unwrap();
248 assert_eq!(writer.len(), 3);
249 }
250
251 {
253 let reader = ChangesetOffsetReader::new(&path, 3).unwrap();
254 assert_eq!(reader.len(), 3);
255
256 let entry = reader.get(0).unwrap().unwrap();
257 assert_eq!(entry.offset(), 0);
258 assert_eq!(entry.num_changes(), 5);
259
260 let entry = reader.get(1).unwrap().unwrap();
261 assert_eq!(entry.offset(), 5);
262 assert_eq!(entry.num_changes(), 3);
263
264 let entry = reader.get(2).unwrap().unwrap();
265 assert_eq!(entry.offset(), 8);
266 assert_eq!(entry.num_changes(), 10);
267
268 assert!(reader.get(3).unwrap().is_none());
269 }
270 }
271
272 #[test]
273 fn test_truncate() {
274 let dir = tempdir().unwrap();
275 let path = dir.path().join("test.csoff");
276
277 let mut writer = ChangesetOffsetWriter::new(&path, 0).unwrap();
278 writer.append(&ChangesetOffset::new(0, 1)).unwrap();
279 writer.append(&ChangesetOffset::new(1, 2)).unwrap();
280 writer.append(&ChangesetOffset::new(3, 3)).unwrap();
281 writer.sync().unwrap();
282
283 writer.truncate(2).unwrap();
284 assert_eq!(writer.len(), 2);
285
286 let reader = ChangesetOffsetReader::new(&path, 2).unwrap();
287 assert_eq!(reader.len(), 2);
288 assert!(reader.get(2).unwrap().is_none());
289 }
290
291 #[test]
292 fn test_partial_record_recovery() {
293 let dir = tempdir().unwrap();
294 let path = dir.path().join("test.csoff");
295
296 {
298 let mut file = std::fs::File::create(&path).unwrap();
299 file.write_all(&100u64.to_le_bytes()).unwrap();
301 file.write_all(&5u64.to_le_bytes()).unwrap();
302 file.write_all(&200u64.to_le_bytes()).unwrap();
304 file.sync_all().unwrap();
305 }
306
307 assert_eq!(std::fs::metadata(&path).unwrap().len(), 24);
309
310 let writer = ChangesetOffsetWriter::new(&path, 1).unwrap();
313 assert_eq!(writer.len(), 1);
314
315 assert_eq!(std::fs::metadata(&path).unwrap().len(), 16);
317
318 let reader = ChangesetOffsetReader::new(&path, 1).unwrap();
320 assert_eq!(reader.len(), 1);
321 let entry = reader.get(0).unwrap().unwrap();
322 assert_eq!(entry.offset(), 100);
323 assert_eq!(entry.num_changes(), 5);
324 }
325
326 #[test]
327 fn test_len_bounds_reads() {
328 let dir = tempdir().unwrap();
329 let path = dir.path().join("test.csoff");
330
331 {
333 let mut writer = ChangesetOffsetWriter::new(&path, 0).unwrap();
334 writer.append(&ChangesetOffset::new(0, 10)).unwrap();
335 writer.append(&ChangesetOffset::new(10, 20)).unwrap();
336 writer.append(&ChangesetOffset::new(30, 30)).unwrap();
337 writer.sync().unwrap();
338 assert_eq!(writer.len(), 3);
339 }
340
341 let reader = ChangesetOffsetReader::new(&path, 2).unwrap();
343 assert_eq!(reader.len(), 2);
344
345 let entry0 = reader.get(0).unwrap().unwrap();
347 assert_eq!(entry0.offset(), 0);
348 assert_eq!(entry0.num_changes(), 10);
349
350 let entry1 = reader.get(1).unwrap().unwrap();
351 assert_eq!(entry1.offset(), 10);
352 assert_eq!(entry1.num_changes(), 20);
353
354 assert!(reader.get(2).unwrap().is_none());
356
357 let range = reader.get_range(0, 5).unwrap();
359 assert_eq!(range.len(), 2);
360 }
361
362 #[test]
363 fn test_truncate_uncommitted_records_on_open() {
364 let dir = tempdir().unwrap();
367 let path = dir.path().join("test.csoff");
368
369 {
371 let mut writer = ChangesetOffsetWriter::new(&path, 0).unwrap();
372 writer.append(&ChangesetOffset::new(0, 5)).unwrap();
373 writer.append(&ChangesetOffset::new(5, 10)).unwrap();
374 writer.append(&ChangesetOffset::new(15, 7)).unwrap(); writer.sync().unwrap();
376 assert_eq!(writer.len(), 3);
377 }
378
379 let committed_len = 2u64;
381 {
382 let writer = ChangesetOffsetWriter::new(&path, committed_len).unwrap();
383 assert_eq!(writer.len(), 2); }
385
386 {
388 let mut writer = ChangesetOffsetWriter::new(&path, 2).unwrap();
389 assert_eq!(writer.len(), 2);
390
391 writer.append(&ChangesetOffset::new(15, 20)).unwrap();
393 writer.sync().unwrap();
394 assert_eq!(writer.len(), 3);
395 }
396
397 {
399 let reader = ChangesetOffsetReader::new(&path, 3).unwrap();
400 assert_eq!(reader.len(), 3);
401
402 let entry0 = reader.get(0).unwrap().unwrap();
403 assert_eq!(entry0.offset(), 0);
404 assert_eq!(entry0.num_changes(), 5);
405
406 let entry1 = reader.get(1).unwrap().unwrap();
407 assert_eq!(entry1.offset(), 5);
408 assert_eq!(entry1.num_changes(), 10);
409
410 let entry2 = reader.get(2).unwrap().unwrap();
412 assert_eq!(entry2.offset(), 15);
413 assert_eq!(entry2.num_changes(), 20); }
415 }
416
417 #[test]
418 fn test_sidecar_shorter_than_committed_errors() {
419 let dir = tempdir().unwrap();
421 let path = dir.path().join("test.csoff");
422
423 {
425 let mut writer = ChangesetOffsetWriter::new(&path, 0).unwrap();
426 writer.append(&ChangesetOffset::new(0, 5)).unwrap();
427 writer.sync().unwrap();
428 }
429
430 let result = ChangesetOffsetWriter::new(&path, 3);
432 assert!(result.is_err());
433 let err = result.unwrap_err();
434 assert_eq!(err.kind(), io::ErrorKind::InvalidData);
435 }
436}