Skip to main content

reth_static_file_types/
changeset_offsets.rs

1//! Changeset offset sidecar file I/O.
2//!
3//! Provides append-only writing and O(1) random-access reading for changeset offsets.
4//! The file format is fixed-width 16-byte records: `[offset: u64 LE][num_changes: u64 LE]`.
5
6use crate::ChangesetOffset;
7use std::{
8    fs::{File, OpenOptions},
9    io::{self, Read, Seek, SeekFrom, Write},
10    path::Path,
11};
12
13/// Writer for appending changeset offsets to a sidecar file.
14#[derive(Debug)]
15pub struct ChangesetOffsetWriter {
16    file: File,
17    /// Number of records written.
18    records_written: u64,
19}
20
21impl ChangesetOffsetWriter {
22    /// Record size in bytes.
23    const RECORD_SIZE: usize = 16;
24
25    /// Opens or creates the changeset offset file for appending.
26    ///
27    /// The file is healed to match `committed_len` (from the segment header):
28    /// - Partial records (from crash mid-write) are truncated to record boundary
29    /// - Extra complete records (from crash after sidecar sync but before header commit) are
30    ///   truncated to match the committed length
31    /// - If the file has fewer records than committed, returns an error (data corruption)
32    ///
33    /// This mirrors `NippyJar`'s healing behavior where config/header is the commit boundary.
34    pub fn new(path: impl AsRef<Path>, committed_len: u64) -> io::Result<Self> {
35        let file = OpenOptions::new()
36            .create(true)
37            .truncate(false)
38            .read(true)
39            .write(true)
40            .open(path.as_ref())?;
41
42        let file_len = file.metadata()?.len();
43        let remainder = file_len % Self::RECORD_SIZE as u64;
44
45        // First, truncate any partial record from crash mid-write
46        let aligned_len = if remainder != 0 {
47            let truncated_len = file_len - remainder;
48            tracing::warn!(
49                target: "reth::static_file",
50                path = %path.as_ref().display(),
51                original_len = file_len,
52                truncated_len,
53                "Truncating partial changeset offset record"
54            );
55            file.set_len(truncated_len)?;
56            file.sync_all()?; // Sync required for crash safety
57            truncated_len
58        } else {
59            file_len
60        };
61
62        let records_in_file = aligned_len / Self::RECORD_SIZE as u64;
63
64        // Heal sidecar to match committed header length
65        match records_in_file.cmp(&committed_len) {
66            std::cmp::Ordering::Greater => {
67                // Sidecar has uncommitted records from a crash - truncate them
68                let target_len = committed_len * Self::RECORD_SIZE as u64;
69                tracing::warn!(
70                    target: "reth::static_file",
71                    path = %path.as_ref().display(),
72                    sidecar_records = records_in_file,
73                    committed_len,
74                    "Truncating uncommitted changeset offset records after crash recovery"
75                );
76                file.set_len(target_len)?;
77                file.sync_all()?; // Sync required for crash safety
78            }
79            std::cmp::Ordering::Less => {
80                // INVARIANT VIOLATION: This should be impossible if healing ran correctly.
81                //
82                // All code paths call `heal_changeset_sidecar()` before this function, which
83                // validates the sidecar against NippyJar state and corrects the header to match
84                // the actual file size. Therefore, `committed_len` should always equal or exceed
85                // `records_in_file` when this function is called.
86                //
87                // If we reach this error, it indicates:
88                // - A bug in the healing logic (header not corrected properly)
89                // - This function was called directly without going through healing
90                // - External corruption occurred between healing and opening (extremely unlikely)
91                return Err(io::Error::new(
92                    io::ErrorKind::InvalidData,
93                    format!(
94                        "INVARIANT VIOLATION: Changeset offset sidecar has {} records but header expects {} \
95                         (healing should have prevented this - possible bug in healing logic): {}",
96                        records_in_file,
97                        committed_len,
98                        path.as_ref().display()
99                    ),
100                ));
101            }
102            std::cmp::Ordering::Equal => {}
103        }
104
105        let records_written = committed_len;
106        let file = OpenOptions::new().create(true).append(true).open(path)?;
107
108        Ok(Self { file, records_written })
109    }
110
111    /// Appends a single changeset offset record.
112    pub fn append(&mut self, offset: &ChangesetOffset) -> io::Result<()> {
113        let mut buf = [0u8; Self::RECORD_SIZE];
114        buf[..8].copy_from_slice(&offset.offset().to_le_bytes());
115        buf[8..].copy_from_slice(&offset.num_changes().to_le_bytes());
116        self.file.write_all(&buf)?;
117        self.records_written += 1;
118        Ok(())
119    }
120
121    /// Appends multiple changeset offset records.
122    pub fn append_many(&mut self, offsets: &[ChangesetOffset]) -> io::Result<()> {
123        for offset in offsets {
124            self.append(offset)?;
125        }
126        Ok(())
127    }
128
129    /// Syncs all data to disk. Must be called before committing the header.
130    pub fn sync(&mut self) -> io::Result<()> {
131        self.file.sync_all()
132    }
133
134    /// Truncates the file to contain exactly `len` records and syncs to disk.
135    /// Used after prune operations to reclaim space.
136    ///
137    /// The sync is required for crash safety - without it, a crash could
138    /// resurrect the old file length.
139    pub fn truncate(&mut self, len: u64) -> io::Result<()> {
140        self.file.set_len(len * Self::RECORD_SIZE as u64)?;
141        self.file.sync_all()?;
142        self.records_written = len;
143        Ok(())
144    }
145
146    /// Returns the number of records in the file.
147    pub const fn len(&self) -> u64 {
148        self.records_written
149    }
150
151    /// Returns true if the file is empty.
152    pub const fn is_empty(&self) -> bool {
153        self.records_written == 0
154    }
155}
156
157/// Reader for changeset offsets with O(1) random access.
158#[derive(Debug)]
159pub struct ChangesetOffsetReader {
160    file: File,
161    /// Cached file length in records.
162    len: u64,
163}
164
165impl ChangesetOffsetReader {
166    /// Record size in bytes.
167    const RECORD_SIZE: usize = 16;
168
169    /// Opens the changeset offset file for reading with an explicit length.
170    ///
171    /// The `len` parameter (from header metadata) bounds the reader - any records
172    /// beyond this length are ignored. This ensures we only read committed data.
173    pub fn new(path: impl AsRef<Path>, len: u64) -> io::Result<Self> {
174        let file = File::open(path)?;
175        Ok(Self { file, len })
176    }
177
178    /// Reads a single changeset offset by block index.
179    /// Returns None if index is out of bounds.
180    pub fn get(&mut self, block_index: u64) -> io::Result<Option<ChangesetOffset>> {
181        if block_index >= self.len {
182            return Ok(None);
183        }
184
185        let byte_pos = block_index * Self::RECORD_SIZE as u64;
186        self.file.seek(SeekFrom::Start(byte_pos))?;
187
188        let mut buf = [0u8; Self::RECORD_SIZE];
189        self.file.read_exact(&mut buf)?;
190
191        let offset = u64::from_le_bytes(buf[..8].try_into().unwrap());
192        let num_changes = u64::from_le_bytes(buf[8..].try_into().unwrap());
193
194        Ok(Some(ChangesetOffset::new(offset, num_changes)))
195    }
196
197    /// Reads a range of changeset offsets.
198    pub fn get_range(&mut self, start: u64, end: u64) -> io::Result<Vec<ChangesetOffset>> {
199        let end = end.min(self.len);
200        if start >= end {
201            return Ok(Vec::new());
202        }
203
204        let count = (end - start) as usize;
205        let byte_pos = start * Self::RECORD_SIZE as u64;
206        self.file.seek(SeekFrom::Start(byte_pos))?;
207
208        let mut result = Vec::with_capacity(count);
209        let mut buf = [0u8; Self::RECORD_SIZE];
210
211        for _ in 0..count {
212            self.file.read_exact(&mut buf)?;
213            let offset = u64::from_le_bytes(buf[..8].try_into().unwrap());
214            let num_changes = u64::from_le_bytes(buf[8..].try_into().unwrap());
215            result.push(ChangesetOffset::new(offset, num_changes));
216        }
217
218        Ok(result)
219    }
220
221    /// Returns the number of valid records.
222    pub const fn len(&self) -> u64 {
223        self.len
224    }
225
226    /// Returns true if there are no records.
227    pub const fn is_empty(&self) -> bool {
228        self.len == 0
229    }
230}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235    use tempfile::tempdir;
236
237    #[test]
238    fn test_write_and_read() {
239        let dir = tempdir().unwrap();
240        let path = dir.path().join("test.csoff");
241
242        // Write (new file, committed_len=0)
243        {
244            let mut writer = ChangesetOffsetWriter::new(&path, 0).unwrap();
245            writer.append(&ChangesetOffset::new(0, 5)).unwrap();
246            writer.append(&ChangesetOffset::new(5, 3)).unwrap();
247            writer.append(&ChangesetOffset::new(8, 10)).unwrap();
248            writer.sync().unwrap();
249            assert_eq!(writer.len(), 3);
250        }
251
252        // Read
253        {
254            let mut reader = ChangesetOffsetReader::new(&path, 3).unwrap();
255            assert_eq!(reader.len(), 3);
256
257            let entry = reader.get(0).unwrap().unwrap();
258            assert_eq!(entry.offset(), 0);
259            assert_eq!(entry.num_changes(), 5);
260
261            let entry = reader.get(1).unwrap().unwrap();
262            assert_eq!(entry.offset(), 5);
263            assert_eq!(entry.num_changes(), 3);
264
265            let entry = reader.get(2).unwrap().unwrap();
266            assert_eq!(entry.offset(), 8);
267            assert_eq!(entry.num_changes(), 10);
268
269            assert!(reader.get(3).unwrap().is_none());
270        }
271    }
272
273    #[test]
274    fn test_truncate() {
275        let dir = tempdir().unwrap();
276        let path = dir.path().join("test.csoff");
277
278        let mut writer = ChangesetOffsetWriter::new(&path, 0).unwrap();
279        writer.append(&ChangesetOffset::new(0, 1)).unwrap();
280        writer.append(&ChangesetOffset::new(1, 2)).unwrap();
281        writer.append(&ChangesetOffset::new(3, 3)).unwrap();
282        writer.sync().unwrap();
283
284        writer.truncate(2).unwrap();
285        assert_eq!(writer.len(), 2);
286
287        let mut reader = ChangesetOffsetReader::new(&path, 2).unwrap();
288        assert_eq!(reader.len(), 2);
289        assert!(reader.get(2).unwrap().is_none());
290    }
291
292    #[test]
293    fn test_partial_record_recovery() {
294        let dir = tempdir().unwrap();
295        let path = dir.path().join("test.csoff");
296
297        // Write 1 full record (16 bytes) + 8 trailing bytes (partial record)
298        {
299            let mut file = std::fs::File::create(&path).unwrap();
300            // Full record: offset=100, num_changes=5
301            file.write_all(&100u64.to_le_bytes()).unwrap();
302            file.write_all(&5u64.to_le_bytes()).unwrap();
303            // Partial record: only 8 bytes (incomplete)
304            file.write_all(&200u64.to_le_bytes()).unwrap();
305            file.sync_all().unwrap();
306        }
307
308        // Verify file has 24 bytes before opening with writer
309        assert_eq!(std::fs::metadata(&path).unwrap().len(), 24);
310
311        // Open with writer, committed_len=1 (header committed 1 record)
312        // Should truncate partial record and match committed length
313        let writer = ChangesetOffsetWriter::new(&path, 1).unwrap();
314        assert_eq!(writer.len(), 1);
315
316        // Verify file was truncated to 16 bytes
317        assert_eq!(std::fs::metadata(&path).unwrap().len(), 16);
318
319        // Verify the complete record is readable
320        let mut reader = ChangesetOffsetReader::new(&path, 1).unwrap();
321        assert_eq!(reader.len(), 1);
322        let entry = reader.get(0).unwrap().unwrap();
323        assert_eq!(entry.offset(), 100);
324        assert_eq!(entry.num_changes(), 5);
325    }
326
327    #[test]
328    fn test_len_bounds_reads() {
329        let dir = tempdir().unwrap();
330        let path = dir.path().join("test.csoff");
331
332        // Write 3 records
333        {
334            let mut writer = ChangesetOffsetWriter::new(&path, 0).unwrap();
335            writer.append(&ChangesetOffset::new(0, 10)).unwrap();
336            writer.append(&ChangesetOffset::new(10, 20)).unwrap();
337            writer.append(&ChangesetOffset::new(30, 30)).unwrap();
338            writer.sync().unwrap();
339            assert_eq!(writer.len(), 3);
340        }
341
342        // Open with len=2, ignoring the 3rd record
343        let mut reader = ChangesetOffsetReader::new(&path, 2).unwrap();
344        assert_eq!(reader.len(), 2);
345
346        // First two records should be readable
347        let entry0 = reader.get(0).unwrap().unwrap();
348        assert_eq!(entry0.offset(), 0);
349        assert_eq!(entry0.num_changes(), 10);
350
351        let entry1 = reader.get(1).unwrap().unwrap();
352        assert_eq!(entry1.offset(), 10);
353        assert_eq!(entry1.num_changes(), 20);
354
355        // Third record should be out of bounds (due to len=2)
356        assert!(reader.get(2).unwrap().is_none());
357
358        // get_range should also respect the len bound
359        let range = reader.get_range(0, 5).unwrap();
360        assert_eq!(range.len(), 2);
361    }
362
363    #[test]
364    fn test_truncate_uncommitted_records_on_open() {
365        // Simulates crash recovery where sidecar has more records than committed header length.
366        // ChangesetOffsetWriter::new() should automatically truncate to committed_len.
367        let dir = tempdir().unwrap();
368        let path = dir.path().join("test.csoff");
369
370        // Simulate: wrote 3 records, synced sidecar, but header only committed len=2
371        {
372            let mut writer = ChangesetOffsetWriter::new(&path, 0).unwrap();
373            writer.append(&ChangesetOffset::new(0, 5)).unwrap();
374            writer.append(&ChangesetOffset::new(5, 10)).unwrap();
375            writer.append(&ChangesetOffset::new(15, 7)).unwrap(); // uncommitted
376            writer.sync().unwrap();
377            assert_eq!(writer.len(), 3);
378        }
379
380        // On "restart", new() heals by truncating to committed length
381        let committed_len = 2u64;
382        {
383            let writer = ChangesetOffsetWriter::new(&path, committed_len).unwrap();
384            assert_eq!(writer.len(), 2); // Healed to committed length
385        }
386
387        // Verify file is now correct length and new appends go to the right place
388        {
389            let mut writer = ChangesetOffsetWriter::new(&path, 2).unwrap();
390            assert_eq!(writer.len(), 2);
391
392            // Append a new record - should be at index 2, not index 3
393            writer.append(&ChangesetOffset::new(15, 20)).unwrap();
394            writer.sync().unwrap();
395            assert_eq!(writer.len(), 3);
396        }
397
398        // Verify the records are correct
399        {
400            let mut reader = ChangesetOffsetReader::new(&path, 3).unwrap();
401            assert_eq!(reader.len(), 3);
402
403            let entry0 = reader.get(0).unwrap().unwrap();
404            assert_eq!(entry0.offset(), 0);
405            assert_eq!(entry0.num_changes(), 5);
406
407            let entry1 = reader.get(1).unwrap().unwrap();
408            assert_eq!(entry1.offset(), 5);
409            assert_eq!(entry1.num_changes(), 10);
410
411            // This should be the NEW record, not the old uncommitted one
412            let entry2 = reader.get(2).unwrap().unwrap();
413            assert_eq!(entry2.offset(), 15);
414            assert_eq!(entry2.num_changes(), 20); // Not 7 from the old uncommitted record
415        }
416    }
417
418    #[test]
419    fn test_sidecar_shorter_than_committed_errors() {
420        // If sidecar has fewer records than committed, it's data corruption - should error.
421        let dir = tempdir().unwrap();
422        let path = dir.path().join("test.csoff");
423
424        // Write 1 record
425        {
426            let mut writer = ChangesetOffsetWriter::new(&path, 0).unwrap();
427            writer.append(&ChangesetOffset::new(0, 5)).unwrap();
428            writer.sync().unwrap();
429        }
430
431        // Try to open with committed_len=3 (header claims more than file has)
432        let result = ChangesetOffsetWriter::new(&path, 3);
433        assert!(result.is_err());
434        let err = result.unwrap_err();
435        assert_eq!(err.kind(), io::ErrorKind::InvalidData);
436    }
437}