Skip to main content

reth_static_file_types/
changeset_offsets.rs

1//! Changeset offset sidecar file I/O.
2//!
3//! Provides append-only writing and O(1) random-access reading for changeset offsets.
4//! The file format is fixed-width 16-byte records: `[offset: u64 LE][num_changes: u64 LE]`.
5
6use crate::ChangesetOffset;
7use std::{
8    fs::{File, OpenOptions},
9    io::{self, Write},
10    os::unix::fs::FileExt,
11    path::Path,
12};
13
14/// Writer for appending changeset offsets to a sidecar file.
15#[derive(Debug)]
16pub struct ChangesetOffsetWriter {
17    file: File,
18    /// Number of records written.
19    records_written: u64,
20}
21
22impl ChangesetOffsetWriter {
23    /// Record size in bytes.
24    const RECORD_SIZE: usize = 16;
25
26    /// Opens or creates the changeset offset file for appending.
27    ///
28    /// The file is healed to match `committed_len` (from the segment header):
29    /// - Partial records (from crash mid-write) are truncated to record boundary
30    /// - Extra complete records (from crash after sidecar sync but before header commit) are
31    ///   truncated to match the committed length
32    /// - If the file has fewer records than committed, returns an error (data corruption)
33    ///
34    /// This mirrors `NippyJar`'s healing behavior where config/header is the commit boundary.
35    pub fn new(path: impl AsRef<Path>, committed_len: u64) -> io::Result<Self> {
36        let file = OpenOptions::new()
37            .create(true)
38            .truncate(false)
39            .read(true)
40            .write(true)
41            .open(path.as_ref())?;
42
43        let file_len = file.metadata()?.len();
44        let remainder = file_len % Self::RECORD_SIZE as u64;
45
46        // First, truncate any partial record from crash mid-write
47        let aligned_len = if remainder != 0 {
48            let truncated_len = file_len - remainder;
49            tracing::warn!(
50                target: "reth::static_file",
51                path = %path.as_ref().display(),
52                original_len = file_len,
53                truncated_len,
54                "Truncating partial changeset offset record"
55            );
56            file.set_len(truncated_len)?;
57            file.sync_all()?; // Sync required for crash safety
58            truncated_len
59        } else {
60            file_len
61        };
62
63        let records_in_file = aligned_len / Self::RECORD_SIZE as u64;
64
65        // Heal sidecar to match committed header length
66        match records_in_file.cmp(&committed_len) {
67            std::cmp::Ordering::Greater => {
68                // Sidecar has uncommitted records from a crash - truncate them
69                let target_len = committed_len * Self::RECORD_SIZE as u64;
70                tracing::warn!(
71                    target: "reth::static_file",
72                    path = %path.as_ref().display(),
73                    sidecar_records = records_in_file,
74                    committed_len,
75                    "Truncating uncommitted changeset offset records after crash recovery"
76                );
77                file.set_len(target_len)?;
78                file.sync_all()?; // Sync required for crash safety
79            }
80            std::cmp::Ordering::Less => {
81                // INVARIANT VIOLATION: This should be impossible if healing ran correctly.
82                //
83                // All code paths call `heal_changeset_sidecar()` before this function, which
84                // validates the sidecar against NippyJar state and corrects the header to match
85                // the actual file size. Therefore, `committed_len` should always equal or exceed
86                // `records_in_file` when this function is called.
87                //
88                // If we reach this error, it indicates:
89                // - A bug in the healing logic (header not corrected properly)
90                // - This function was called directly without going through healing
91                // - External corruption occurred between healing and opening (extremely unlikely)
92                return Err(io::Error::new(
93                    io::ErrorKind::InvalidData,
94                    format!(
95                        "INVARIANT VIOLATION: Changeset offset sidecar has {} records but header expects {} \
96                         (healing should have prevented this - possible bug in healing logic): {}",
97                        records_in_file,
98                        committed_len,
99                        path.as_ref().display()
100                    ),
101                ));
102            }
103            std::cmp::Ordering::Equal => {}
104        }
105
106        let records_written = committed_len;
107        let file = OpenOptions::new().create(true).append(true).open(path)?;
108
109        Ok(Self { file, records_written })
110    }
111
112    /// Appends a single changeset offset record.
113    pub fn append(&mut self, offset: &ChangesetOffset) -> io::Result<()> {
114        let mut buf = [0u8; Self::RECORD_SIZE];
115        buf[..8].copy_from_slice(&offset.offset().to_le_bytes());
116        buf[8..].copy_from_slice(&offset.num_changes().to_le_bytes());
117        self.file.write_all(&buf)?;
118        self.records_written += 1;
119        Ok(())
120    }
121
122    /// Appends multiple changeset offset records.
123    pub fn append_many(&mut self, offsets: &[ChangesetOffset]) -> io::Result<()> {
124        for offset in offsets {
125            self.append(offset)?;
126        }
127        Ok(())
128    }
129
130    /// Syncs all data to disk. Must be called before committing the header.
131    pub fn sync(&mut self) -> io::Result<()> {
132        self.file.sync_all()
133    }
134
135    /// Truncates the file to contain exactly `len` records and syncs to disk.
136    /// Used after prune operations to reclaim space.
137    ///
138    /// The sync is required for crash safety - without it, a crash could
139    /// resurrect the old file length.
140    pub fn truncate(&mut self, len: u64) -> io::Result<()> {
141        self.file.set_len(len * Self::RECORD_SIZE as u64)?;
142        self.file.sync_all()?;
143        self.records_written = len;
144        Ok(())
145    }
146
147    /// Returns the number of records in the file.
148    pub const fn len(&self) -> u64 {
149        self.records_written
150    }
151
152    /// Returns true if the file is empty.
153    pub const fn is_empty(&self) -> bool {
154        self.records_written == 0
155    }
156}
157
158/// Reader for changeset offsets with O(1) random access.
159#[derive(Debug)]
160pub struct ChangesetOffsetReader {
161    file: File,
162    /// Cached file length in records.
163    len: u64,
164}
165
166impl ChangesetOffsetReader {
167    /// Record size in bytes.
168    const RECORD_SIZE: usize = 16;
169
170    /// Opens the changeset offset file for reading with an explicit length.
171    ///
172    /// The `len` parameter (from header metadata) bounds the reader - any records
173    /// beyond this length are ignored. This ensures we only read committed data.
174    pub fn new(path: impl AsRef<Path>, len: u64) -> io::Result<Self> {
175        let file = File::open(path)?;
176        Ok(Self { file, len })
177    }
178
179    /// Reads a single changeset offset by block index.
180    /// Returns None if index is out of bounds.
181    pub fn get(&self, block_index: u64) -> io::Result<Option<ChangesetOffset>> {
182        if block_index >= self.len {
183            return Ok(None);
184        }
185
186        let byte_pos = block_index * Self::RECORD_SIZE as u64;
187        let mut buf = [0u8; Self::RECORD_SIZE];
188        self.file.read_exact_at(&mut buf, byte_pos)?;
189
190        let offset = u64::from_le_bytes(buf[..8].try_into().unwrap());
191        let num_changes = u64::from_le_bytes(buf[8..].try_into().unwrap());
192
193        Ok(Some(ChangesetOffset::new(offset, num_changes)))
194    }
195
196    /// Reads a range of changeset offsets.
197    pub fn get_range(&self, start: u64, end: u64) -> io::Result<Vec<ChangesetOffset>> {
198        let end = end.min(self.len);
199        if start >= end {
200            return Ok(Vec::new());
201        }
202
203        let count = (end - start) as usize;
204        let byte_pos = start * Self::RECORD_SIZE as u64;
205
206        let mut result = Vec::with_capacity(count);
207        let mut buf = [0u8; Self::RECORD_SIZE];
208
209        for i in 0..count {
210            let pos = byte_pos + (i as u64) * Self::RECORD_SIZE as u64;
211            self.file.read_exact_at(&mut buf, pos)?;
212            let offset = u64::from_le_bytes(buf[..8].try_into().unwrap());
213            let num_changes = u64::from_le_bytes(buf[8..].try_into().unwrap());
214            result.push(ChangesetOffset::new(offset, num_changes));
215        }
216
217        Ok(result)
218    }
219
220    /// Returns the number of valid records.
221    pub const fn len(&self) -> u64 {
222        self.len
223    }
224
225    /// Returns true if there are no records.
226    pub const fn is_empty(&self) -> bool {
227        self.len == 0
228    }
229}
230
231#[cfg(test)]
232mod tests {
233    use super::*;
234    use tempfile::tempdir;
235
236    #[test]
237    fn test_write_and_read() {
238        let dir = tempdir().unwrap();
239        let path = dir.path().join("test.csoff");
240
241        // Write (new file, committed_len=0)
242        {
243            let mut writer = ChangesetOffsetWriter::new(&path, 0).unwrap();
244            writer.append(&ChangesetOffset::new(0, 5)).unwrap();
245            writer.append(&ChangesetOffset::new(5, 3)).unwrap();
246            writer.append(&ChangesetOffset::new(8, 10)).unwrap();
247            writer.sync().unwrap();
248            assert_eq!(writer.len(), 3);
249        }
250
251        // Read
252        {
253            let reader = ChangesetOffsetReader::new(&path, 3).unwrap();
254            assert_eq!(reader.len(), 3);
255
256            let entry = reader.get(0).unwrap().unwrap();
257            assert_eq!(entry.offset(), 0);
258            assert_eq!(entry.num_changes(), 5);
259
260            let entry = reader.get(1).unwrap().unwrap();
261            assert_eq!(entry.offset(), 5);
262            assert_eq!(entry.num_changes(), 3);
263
264            let entry = reader.get(2).unwrap().unwrap();
265            assert_eq!(entry.offset(), 8);
266            assert_eq!(entry.num_changes(), 10);
267
268            assert!(reader.get(3).unwrap().is_none());
269        }
270    }
271
272    #[test]
273    fn test_truncate() {
274        let dir = tempdir().unwrap();
275        let path = dir.path().join("test.csoff");
276
277        let mut writer = ChangesetOffsetWriter::new(&path, 0).unwrap();
278        writer.append(&ChangesetOffset::new(0, 1)).unwrap();
279        writer.append(&ChangesetOffset::new(1, 2)).unwrap();
280        writer.append(&ChangesetOffset::new(3, 3)).unwrap();
281        writer.sync().unwrap();
282
283        writer.truncate(2).unwrap();
284        assert_eq!(writer.len(), 2);
285
286        let reader = ChangesetOffsetReader::new(&path, 2).unwrap();
287        assert_eq!(reader.len(), 2);
288        assert!(reader.get(2).unwrap().is_none());
289    }
290
291    #[test]
292    fn test_partial_record_recovery() {
293        let dir = tempdir().unwrap();
294        let path = dir.path().join("test.csoff");
295
296        // Write 1 full record (16 bytes) + 8 trailing bytes (partial record)
297        {
298            let mut file = std::fs::File::create(&path).unwrap();
299            // Full record: offset=100, num_changes=5
300            file.write_all(&100u64.to_le_bytes()).unwrap();
301            file.write_all(&5u64.to_le_bytes()).unwrap();
302            // Partial record: only 8 bytes (incomplete)
303            file.write_all(&200u64.to_le_bytes()).unwrap();
304            file.sync_all().unwrap();
305        }
306
307        // Verify file has 24 bytes before opening with writer
308        assert_eq!(std::fs::metadata(&path).unwrap().len(), 24);
309
310        // Open with writer, committed_len=1 (header committed 1 record)
311        // Should truncate partial record and match committed length
312        let writer = ChangesetOffsetWriter::new(&path, 1).unwrap();
313        assert_eq!(writer.len(), 1);
314
315        // Verify file was truncated to 16 bytes
316        assert_eq!(std::fs::metadata(&path).unwrap().len(), 16);
317
318        // Verify the complete record is readable
319        let reader = ChangesetOffsetReader::new(&path, 1).unwrap();
320        assert_eq!(reader.len(), 1);
321        let entry = reader.get(0).unwrap().unwrap();
322        assert_eq!(entry.offset(), 100);
323        assert_eq!(entry.num_changes(), 5);
324    }
325
326    #[test]
327    fn test_len_bounds_reads() {
328        let dir = tempdir().unwrap();
329        let path = dir.path().join("test.csoff");
330
331        // Write 3 records
332        {
333            let mut writer = ChangesetOffsetWriter::new(&path, 0).unwrap();
334            writer.append(&ChangesetOffset::new(0, 10)).unwrap();
335            writer.append(&ChangesetOffset::new(10, 20)).unwrap();
336            writer.append(&ChangesetOffset::new(30, 30)).unwrap();
337            writer.sync().unwrap();
338            assert_eq!(writer.len(), 3);
339        }
340
341        // Open with len=2, ignoring the 3rd record
342        let reader = ChangesetOffsetReader::new(&path, 2).unwrap();
343        assert_eq!(reader.len(), 2);
344
345        // First two records should be readable
346        let entry0 = reader.get(0).unwrap().unwrap();
347        assert_eq!(entry0.offset(), 0);
348        assert_eq!(entry0.num_changes(), 10);
349
350        let entry1 = reader.get(1).unwrap().unwrap();
351        assert_eq!(entry1.offset(), 10);
352        assert_eq!(entry1.num_changes(), 20);
353
354        // Third record should be out of bounds (due to len=2)
355        assert!(reader.get(2).unwrap().is_none());
356
357        // get_range should also respect the len bound
358        let range = reader.get_range(0, 5).unwrap();
359        assert_eq!(range.len(), 2);
360    }
361
362    #[test]
363    fn test_truncate_uncommitted_records_on_open() {
364        // Simulates crash recovery where sidecar has more records than committed header length.
365        // ChangesetOffsetWriter::new() should automatically truncate to committed_len.
366        let dir = tempdir().unwrap();
367        let path = dir.path().join("test.csoff");
368
369        // Simulate: wrote 3 records, synced sidecar, but header only committed len=2
370        {
371            let mut writer = ChangesetOffsetWriter::new(&path, 0).unwrap();
372            writer.append(&ChangesetOffset::new(0, 5)).unwrap();
373            writer.append(&ChangesetOffset::new(5, 10)).unwrap();
374            writer.append(&ChangesetOffset::new(15, 7)).unwrap(); // uncommitted
375            writer.sync().unwrap();
376            assert_eq!(writer.len(), 3);
377        }
378
379        // On "restart", new() heals by truncating to committed length
380        let committed_len = 2u64;
381        {
382            let writer = ChangesetOffsetWriter::new(&path, committed_len).unwrap();
383            assert_eq!(writer.len(), 2); // Healed to committed length
384        }
385
386        // Verify file is now correct length and new appends go to the right place
387        {
388            let mut writer = ChangesetOffsetWriter::new(&path, 2).unwrap();
389            assert_eq!(writer.len(), 2);
390
391            // Append a new record - should be at index 2, not index 3
392            writer.append(&ChangesetOffset::new(15, 20)).unwrap();
393            writer.sync().unwrap();
394            assert_eq!(writer.len(), 3);
395        }
396
397        // Verify the records are correct
398        {
399            let reader = ChangesetOffsetReader::new(&path, 3).unwrap();
400            assert_eq!(reader.len(), 3);
401
402            let entry0 = reader.get(0).unwrap().unwrap();
403            assert_eq!(entry0.offset(), 0);
404            assert_eq!(entry0.num_changes(), 5);
405
406            let entry1 = reader.get(1).unwrap().unwrap();
407            assert_eq!(entry1.offset(), 5);
408            assert_eq!(entry1.num_changes(), 10);
409
410            // This should be the NEW record, not the old uncommitted one
411            let entry2 = reader.get(2).unwrap().unwrap();
412            assert_eq!(entry2.offset(), 15);
413            assert_eq!(entry2.num_changes(), 20); // Not 7 from the old uncommitted record
414        }
415    }
416
417    #[test]
418    fn test_sidecar_shorter_than_committed_errors() {
419        // If sidecar has fewer records than committed, it's data corruption - should error.
420        let dir = tempdir().unwrap();
421        let path = dir.path().join("test.csoff");
422
423        // Write 1 record
424        {
425            let mut writer = ChangesetOffsetWriter::new(&path, 0).unwrap();
426            writer.append(&ChangesetOffset::new(0, 5)).unwrap();
427            writer.sync().unwrap();
428        }
429
430        // Try to open with committed_len=3 (header claims more than file has)
431        let result = ChangesetOffsetWriter::new(&path, 3);
432        assert!(result.is_err());
433        let err = result.unwrap_err();
434        assert_eq!(err.kind(), io::ErrorKind::InvalidData);
435    }
436}