1use crate::ChangesetOffset;
7use std::{
8 fs::{File, OpenOptions},
9 io::{self, Read, Seek, SeekFrom, Write},
10 path::Path,
11};
12
13#[derive(Debug)]
15pub struct ChangesetOffsetWriter {
16 file: File,
17 records_written: u64,
19}
20
21impl ChangesetOffsetWriter {
22 const RECORD_SIZE: usize = 16;
24
25 pub fn new(path: impl AsRef<Path>, committed_len: u64) -> io::Result<Self> {
35 let file = OpenOptions::new()
36 .create(true)
37 .truncate(false)
38 .read(true)
39 .write(true)
40 .open(path.as_ref())?;
41
42 let file_len = file.metadata()?.len();
43 let remainder = file_len % Self::RECORD_SIZE as u64;
44
45 let aligned_len = if remainder != 0 {
47 let truncated_len = file_len - remainder;
48 tracing::warn!(
49 target: "reth::static_file",
50 path = %path.as_ref().display(),
51 original_len = file_len,
52 truncated_len,
53 "Truncating partial changeset offset record"
54 );
55 file.set_len(truncated_len)?;
56 file.sync_all()?; truncated_len
58 } else {
59 file_len
60 };
61
62 let records_in_file = aligned_len / Self::RECORD_SIZE as u64;
63
64 match records_in_file.cmp(&committed_len) {
66 std::cmp::Ordering::Greater => {
67 let target_len = committed_len * Self::RECORD_SIZE as u64;
69 tracing::warn!(
70 target: "reth::static_file",
71 path = %path.as_ref().display(),
72 sidecar_records = records_in_file,
73 committed_len,
74 "Truncating uncommitted changeset offset records after crash recovery"
75 );
76 file.set_len(target_len)?;
77 file.sync_all()?; }
79 std::cmp::Ordering::Less => {
80 return Err(io::Error::new(
92 io::ErrorKind::InvalidData,
93 format!(
94 "INVARIANT VIOLATION: Changeset offset sidecar has {} records but header expects {} \
95 (healing should have prevented this - possible bug in healing logic): {}",
96 records_in_file,
97 committed_len,
98 path.as_ref().display()
99 ),
100 ));
101 }
102 std::cmp::Ordering::Equal => {}
103 }
104
105 let records_written = committed_len;
106 let file = OpenOptions::new().create(true).append(true).open(path)?;
107
108 Ok(Self { file, records_written })
109 }
110
111 pub fn append(&mut self, offset: &ChangesetOffset) -> io::Result<()> {
113 let mut buf = [0u8; Self::RECORD_SIZE];
114 buf[..8].copy_from_slice(&offset.offset().to_le_bytes());
115 buf[8..].copy_from_slice(&offset.num_changes().to_le_bytes());
116 self.file.write_all(&buf)?;
117 self.records_written += 1;
118 Ok(())
119 }
120
121 pub fn append_many(&mut self, offsets: &[ChangesetOffset]) -> io::Result<()> {
123 for offset in offsets {
124 self.append(offset)?;
125 }
126 Ok(())
127 }
128
129 pub fn sync(&mut self) -> io::Result<()> {
131 self.file.sync_all()
132 }
133
134 pub fn truncate(&mut self, len: u64) -> io::Result<()> {
140 self.file.set_len(len * Self::RECORD_SIZE as u64)?;
141 self.file.sync_all()?;
142 self.records_written = len;
143 Ok(())
144 }
145
146 pub const fn len(&self) -> u64 {
148 self.records_written
149 }
150
151 pub const fn is_empty(&self) -> bool {
153 self.records_written == 0
154 }
155}
156
157#[derive(Debug)]
159pub struct ChangesetOffsetReader {
160 file: File,
161 len: u64,
163}
164
165impl ChangesetOffsetReader {
166 const RECORD_SIZE: usize = 16;
168
169 pub fn new(path: impl AsRef<Path>, len: u64) -> io::Result<Self> {
174 let file = File::open(path)?;
175 Ok(Self { file, len })
176 }
177
178 pub fn get(&mut self, block_index: u64) -> io::Result<Option<ChangesetOffset>> {
181 if block_index >= self.len {
182 return Ok(None);
183 }
184
185 let byte_pos = block_index * Self::RECORD_SIZE as u64;
186 self.file.seek(SeekFrom::Start(byte_pos))?;
187
188 let mut buf = [0u8; Self::RECORD_SIZE];
189 self.file.read_exact(&mut buf)?;
190
191 let offset = u64::from_le_bytes(buf[..8].try_into().unwrap());
192 let num_changes = u64::from_le_bytes(buf[8..].try_into().unwrap());
193
194 Ok(Some(ChangesetOffset::new(offset, num_changes)))
195 }
196
197 pub fn get_range(&mut self, start: u64, end: u64) -> io::Result<Vec<ChangesetOffset>> {
199 let end = end.min(self.len);
200 if start >= end {
201 return Ok(Vec::new());
202 }
203
204 let count = (end - start) as usize;
205 let byte_pos = start * Self::RECORD_SIZE as u64;
206 self.file.seek(SeekFrom::Start(byte_pos))?;
207
208 let mut result = Vec::with_capacity(count);
209 let mut buf = [0u8; Self::RECORD_SIZE];
210
211 for _ in 0..count {
212 self.file.read_exact(&mut buf)?;
213 let offset = u64::from_le_bytes(buf[..8].try_into().unwrap());
214 let num_changes = u64::from_le_bytes(buf[8..].try_into().unwrap());
215 result.push(ChangesetOffset::new(offset, num_changes));
216 }
217
218 Ok(result)
219 }
220
221 pub const fn len(&self) -> u64 {
223 self.len
224 }
225
226 pub const fn is_empty(&self) -> bool {
228 self.len == 0
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235 use tempfile::tempdir;
236
237 #[test]
238 fn test_write_and_read() {
239 let dir = tempdir().unwrap();
240 let path = dir.path().join("test.csoff");
241
242 {
244 let mut writer = ChangesetOffsetWriter::new(&path, 0).unwrap();
245 writer.append(&ChangesetOffset::new(0, 5)).unwrap();
246 writer.append(&ChangesetOffset::new(5, 3)).unwrap();
247 writer.append(&ChangesetOffset::new(8, 10)).unwrap();
248 writer.sync().unwrap();
249 assert_eq!(writer.len(), 3);
250 }
251
252 {
254 let mut reader = ChangesetOffsetReader::new(&path, 3).unwrap();
255 assert_eq!(reader.len(), 3);
256
257 let entry = reader.get(0).unwrap().unwrap();
258 assert_eq!(entry.offset(), 0);
259 assert_eq!(entry.num_changes(), 5);
260
261 let entry = reader.get(1).unwrap().unwrap();
262 assert_eq!(entry.offset(), 5);
263 assert_eq!(entry.num_changes(), 3);
264
265 let entry = reader.get(2).unwrap().unwrap();
266 assert_eq!(entry.offset(), 8);
267 assert_eq!(entry.num_changes(), 10);
268
269 assert!(reader.get(3).unwrap().is_none());
270 }
271 }
272
273 #[test]
274 fn test_truncate() {
275 let dir = tempdir().unwrap();
276 let path = dir.path().join("test.csoff");
277
278 let mut writer = ChangesetOffsetWriter::new(&path, 0).unwrap();
279 writer.append(&ChangesetOffset::new(0, 1)).unwrap();
280 writer.append(&ChangesetOffset::new(1, 2)).unwrap();
281 writer.append(&ChangesetOffset::new(3, 3)).unwrap();
282 writer.sync().unwrap();
283
284 writer.truncate(2).unwrap();
285 assert_eq!(writer.len(), 2);
286
287 let mut reader = ChangesetOffsetReader::new(&path, 2).unwrap();
288 assert_eq!(reader.len(), 2);
289 assert!(reader.get(2).unwrap().is_none());
290 }
291
292 #[test]
293 fn test_partial_record_recovery() {
294 let dir = tempdir().unwrap();
295 let path = dir.path().join("test.csoff");
296
297 {
299 let mut file = std::fs::File::create(&path).unwrap();
300 file.write_all(&100u64.to_le_bytes()).unwrap();
302 file.write_all(&5u64.to_le_bytes()).unwrap();
303 file.write_all(&200u64.to_le_bytes()).unwrap();
305 file.sync_all().unwrap();
306 }
307
308 assert_eq!(std::fs::metadata(&path).unwrap().len(), 24);
310
311 let writer = ChangesetOffsetWriter::new(&path, 1).unwrap();
314 assert_eq!(writer.len(), 1);
315
316 assert_eq!(std::fs::metadata(&path).unwrap().len(), 16);
318
319 let mut reader = ChangesetOffsetReader::new(&path, 1).unwrap();
321 assert_eq!(reader.len(), 1);
322 let entry = reader.get(0).unwrap().unwrap();
323 assert_eq!(entry.offset(), 100);
324 assert_eq!(entry.num_changes(), 5);
325 }
326
327 #[test]
328 fn test_len_bounds_reads() {
329 let dir = tempdir().unwrap();
330 let path = dir.path().join("test.csoff");
331
332 {
334 let mut writer = ChangesetOffsetWriter::new(&path, 0).unwrap();
335 writer.append(&ChangesetOffset::new(0, 10)).unwrap();
336 writer.append(&ChangesetOffset::new(10, 20)).unwrap();
337 writer.append(&ChangesetOffset::new(30, 30)).unwrap();
338 writer.sync().unwrap();
339 assert_eq!(writer.len(), 3);
340 }
341
342 let mut reader = ChangesetOffsetReader::new(&path, 2).unwrap();
344 assert_eq!(reader.len(), 2);
345
346 let entry0 = reader.get(0).unwrap().unwrap();
348 assert_eq!(entry0.offset(), 0);
349 assert_eq!(entry0.num_changes(), 10);
350
351 let entry1 = reader.get(1).unwrap().unwrap();
352 assert_eq!(entry1.offset(), 10);
353 assert_eq!(entry1.num_changes(), 20);
354
355 assert!(reader.get(2).unwrap().is_none());
357
358 let range = reader.get_range(0, 5).unwrap();
360 assert_eq!(range.len(), 2);
361 }
362
363 #[test]
364 fn test_truncate_uncommitted_records_on_open() {
365 let dir = tempdir().unwrap();
368 let path = dir.path().join("test.csoff");
369
370 {
372 let mut writer = ChangesetOffsetWriter::new(&path, 0).unwrap();
373 writer.append(&ChangesetOffset::new(0, 5)).unwrap();
374 writer.append(&ChangesetOffset::new(5, 10)).unwrap();
375 writer.append(&ChangesetOffset::new(15, 7)).unwrap(); writer.sync().unwrap();
377 assert_eq!(writer.len(), 3);
378 }
379
380 let committed_len = 2u64;
382 {
383 let writer = ChangesetOffsetWriter::new(&path, committed_len).unwrap();
384 assert_eq!(writer.len(), 2); }
386
387 {
389 let mut writer = ChangesetOffsetWriter::new(&path, 2).unwrap();
390 assert_eq!(writer.len(), 2);
391
392 writer.append(&ChangesetOffset::new(15, 20)).unwrap();
394 writer.sync().unwrap();
395 assert_eq!(writer.len(), 3);
396 }
397
398 {
400 let mut reader = ChangesetOffsetReader::new(&path, 3).unwrap();
401 assert_eq!(reader.len(), 3);
402
403 let entry0 = reader.get(0).unwrap().unwrap();
404 assert_eq!(entry0.offset(), 0);
405 assert_eq!(entry0.num_changes(), 5);
406
407 let entry1 = reader.get(1).unwrap().unwrap();
408 assert_eq!(entry1.offset(), 5);
409 assert_eq!(entry1.num_changes(), 10);
410
411 let entry2 = reader.get(2).unwrap().unwrap();
413 assert_eq!(entry2.offset(), 15);
414 assert_eq!(entry2.num_changes(), 20); }
416 }
417
418 #[test]
419 fn test_sidecar_shorter_than_committed_errors() {
420 let dir = tempdir().unwrap();
422 let path = dir.path().join("test.csoff");
423
424 {
426 let mut writer = ChangesetOffsetWriter::new(&path, 0).unwrap();
427 writer.append(&ChangesetOffset::new(0, 5)).unwrap();
428 writer.sync().unwrap();
429 }
430
431 let result = ChangesetOffsetWriter::new(&path, 3);
433 assert!(result.is_err());
434 let err = result.unwrap_err();
435 assert_eq!(err.kind(), io::ErrorKind::InvalidData);
436 }
437}