reth_era/
e2s_file.rs

1//! `E2Store` file reader
2//!
3//! See also <https://github.com/status-im/nimbus-eth2/blob/stable/docs/e2store.md>
4
5use crate::e2s_types::{E2sError, Entry, Version};
6use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
7
8/// A reader for `E2Store` files that wraps a [`BufReader`].
9
10#[derive(Debug)]
11pub struct E2StoreReader<R: Read> {
12    /// Buffered reader
13    reader: BufReader<R>,
14}
15
16impl<R: Read + Seek> E2StoreReader<R> {
17    /// Create a new [`E2StoreReader`]
18    pub fn new(reader: R) -> Self {
19        Self { reader: BufReader::new(reader) }
20    }
21
22    /// Read and validate the version record
23    pub fn read_version(&mut self) -> Result<Option<Entry>, E2sError> {
24        // Reset reader to beginning
25        self.reader.seek(SeekFrom::Start(0))?;
26
27        match Entry::read(&mut self.reader)? {
28            Some(entry) if entry.is_version() => Ok(Some(entry)),
29            Some(_) => Err(E2sError::Ssz("First entry must be a Version entry".to_string())),
30            None => Ok(None),
31        }
32    }
33
34    /// Read the next entry from the file
35    pub fn read_next_entry(&mut self) -> Result<Option<Entry>, E2sError> {
36        Entry::read(&mut self.reader)
37    }
38
39    /// Iterate through all entries, including the version entry
40    pub fn entries(&mut self) -> Result<Vec<Entry>, E2sError> {
41        // Reset reader to beginning
42        self.reader.seek(SeekFrom::Start(0))?;
43
44        let mut entries = Vec::new();
45
46        while let Some(entry) = self.read_next_entry()? {
47            entries.push(entry);
48        }
49
50        Ok(entries)
51    }
52}
53
54/// A writer for `E2Store` files that wraps a [`BufWriter`].
55#[derive(Debug)]
56pub struct E2StoreWriter<W: Write> {
57    /// Buffered writer
58    writer: BufWriter<W>,
59    /// Tracks whether this writer has written a version entry
60    has_written_version: bool,
61}
62
63impl<W: Write> E2StoreWriter<W> {
64    /// Create a new [`E2StoreWriter`]
65    pub fn new(writer: W) -> Self {
66        Self { writer: BufWriter::new(writer), has_written_version: false }
67    }
68
69    /// Create a new [`E2StoreWriter`] and write the version entry
70    pub fn with_version(writer: W) -> Result<Self, E2sError> {
71        let mut writer = Self::new(writer);
72        writer.write_version()?;
73        Ok(writer)
74    }
75
76    /// Write the version entry as the first entry in the file.
77    /// This must be called before writing any other entries.
78    pub fn write_version(&mut self) -> Result<(), E2sError> {
79        if self.has_written_version {
80            return Ok(());
81        }
82
83        let version = Version;
84        version.encode(&mut self.writer)?;
85        self.has_written_version = true;
86        Ok(())
87    }
88
89    /// Write an entry to the file.
90    /// If a version entry has not been written yet, it will be added.
91    pub fn write_entry(&mut self, entry: &Entry) -> Result<(), E2sError> {
92        if !self.has_written_version {
93            self.write_version()?;
94        }
95
96        entry.write(&mut self.writer)?;
97        Ok(())
98    }
99
100    /// Flush any buffered data to the underlying writer
101    pub fn flush(&mut self) -> Result<(), E2sError> {
102        self.writer.flush().map_err(E2sError::Io)
103    }
104}
105
106#[cfg(test)]
107mod tests {
108    use super::*;
109    use crate::e2s_types::{SLOT_INDEX, VERSION};
110    use std::io::Cursor;
111
112    fn create_slot_index_data(starting_slot: u64, offsets: &[i64]) -> Vec<u8> {
113        // Format: starting-slot | index | index | index ... | count
114        let mut data = Vec::with_capacity(8 + offsets.len() * 8 + 8);
115
116        // Add starting slot
117        data.extend_from_slice(&starting_slot.to_le_bytes());
118
119        // Add all offsets
120        for offset in offsets {
121            data.extend_from_slice(&offset.to_le_bytes());
122        }
123
124        // Add count
125        data.extend_from_slice(&(offsets.len() as i64).to_le_bytes());
126
127        data
128    }
129
130    #[test]
131    fn test_e2store_reader() -> Result<(), E2sError> {
132        // Create a mock e2store file in memory
133        let mut mock_file = Vec::new();
134
135        let version_entry = Entry::new(VERSION, Vec::new());
136        version_entry.write(&mut mock_file)?;
137
138        let slot_index_entry1 = Entry::new(SLOT_INDEX, create_slot_index_data(1, &[1024]));
139        slot_index_entry1.write(&mut mock_file)?;
140
141        let slot_index_entry2 = Entry::new(SLOT_INDEX, create_slot_index_data(2, &[2048]));
142        slot_index_entry2.write(&mut mock_file)?;
143
144        let custom_type = [0x99, 0x99];
145        let custom_entry = Entry::new(custom_type, vec![10, 11, 12]);
146        custom_entry.write(&mut mock_file)?;
147
148        let cursor = Cursor::new(mock_file);
149        let mut e2store_reader = E2StoreReader::new(cursor);
150
151        let version = e2store_reader.read_version()?;
152        assert!(version.is_some());
153
154        let entries = e2store_reader.entries()?;
155
156        // Validate entries
157        assert_eq!(entries.len(), 4);
158        // First entry should be version
159        assert!(entries[0].is_version());
160        // Second entry should be slot index
161        assert!(entries[1].is_slot_index());
162        // Third entry is slot index
163        assert!(entries[2].is_slot_index());
164        // Fourth entry is custom type
165        assert!(entries[3].entry_type == [0x99, 0x99]);
166
167        Ok(())
168    }
169
170    #[test]
171    fn test_slot_index_with_multiple_offsets() -> Result<(), E2sError> {
172        let starting_slot = 100;
173        let offsets = &[1024, 2048, 0, 0, 3072, 0, 4096];
174
175        let slot_index_data = create_slot_index_data(starting_slot, offsets);
176
177        let slot_index_entry = Entry::new(SLOT_INDEX, slot_index_data.clone());
178
179        // Verify the slot index data format
180        assert_eq!(slot_index_data.len(), 8 + offsets.len() * 8 + 8);
181
182        // Check the starting slot
183        let mut starting_slot_bytes = [0u8; 8];
184        starting_slot_bytes.copy_from_slice(&slot_index_data[0..8]);
185        assert_eq!(u64::from_le_bytes(starting_slot_bytes), starting_slot);
186
187        // Check the count at the end
188        let mut count_bytes = [0u8; 8];
189        count_bytes.copy_from_slice(&slot_index_data[slot_index_data.len() - 8..]);
190        assert_eq!(i64::from_le_bytes(count_bytes), offsets.len() as i64);
191
192        // Verify we can write and read it back
193        let mut buffer = Vec::new();
194        slot_index_entry.write(&mut buffer)?;
195
196        let cursor = Cursor::new(buffer);
197        let mut reader = E2StoreReader::new(cursor);
198
199        let read_entry = reader.read_next_entry()?.unwrap();
200        assert!(read_entry.is_slot_index());
201
202        assert_eq!(read_entry.data.len(), slot_index_data.len());
203
204        Ok(())
205    }
206
207    #[test]
208    fn test_empty_file() -> Result<(), E2sError> {
209        // Create an empty file
210        let mock_file = Vec::new();
211
212        // Create reader
213        let cursor = Cursor::new(mock_file);
214        let mut e2store_reader = E2StoreReader::new(cursor);
215
216        // Reading version should return None
217        let version = e2store_reader.read_version()?;
218        assert!(version.is_none());
219
220        // Entries should be empty
221        let entries = e2store_reader.entries()?;
222        assert!(entries.is_empty());
223
224        Ok(())
225    }
226
227    #[test]
228    fn test_read_next_entry() -> Result<(), E2sError> {
229        let mut mock_file = Vec::new();
230
231        let version_entry = Entry::new(VERSION, Vec::new());
232        version_entry.write(&mut mock_file)?;
233
234        let slot_entry = Entry::new(SLOT_INDEX, create_slot_index_data(1, &[1024]));
235        slot_entry.write(&mut mock_file)?;
236
237        let cursor = Cursor::new(mock_file);
238        let mut reader = E2StoreReader::new(cursor);
239
240        let first = reader.read_next_entry()?.unwrap();
241        assert!(first.is_version());
242
243        let second = reader.read_next_entry()?.unwrap();
244        assert!(second.is_slot_index());
245
246        let third = reader.read_next_entry()?;
247        assert!(third.is_none());
248
249        Ok(())
250    }
251
252    #[test]
253    fn test_e2store_writer() -> Result<(), E2sError> {
254        let mut buffer = Vec::new();
255
256        {
257            let mut writer = E2StoreWriter::new(&mut buffer);
258
259            // Write version entry
260            writer.write_version()?;
261
262            // Write a block index entry
263            let block_entry = Entry::new(SLOT_INDEX, create_slot_index_data(1, &[1024]));
264            writer.write_entry(&block_entry)?;
265
266            // Write a custom entry
267            let custom_type = [0x99, 0x99];
268            let custom_entry = Entry::new(custom_type, vec![10, 11, 12]);
269            writer.write_entry(&custom_entry)?;
270
271            writer.flush()?;
272        }
273
274        let cursor = Cursor::new(&buffer);
275        let mut reader = E2StoreReader::new(cursor);
276
277        let entries = reader.entries()?;
278        assert_eq!(entries.len(), 3);
279        assert!(entries[0].is_version());
280        assert!(entries[1].is_slot_index());
281        assert_eq!(entries[2].entry_type, [0x99, 0x99]);
282
283        Ok(())
284    }
285
286    #[test]
287    fn test_writer_implicit_version_insertion() -> Result<(), E2sError> {
288        let mut buffer = Vec::new();
289
290        {
291            // Writer without explicitly writing the version
292            let mut writer = E2StoreWriter::new(&mut buffer);
293
294            // Write an entry, it should automatically add a version first
295            let custom_type = [0x42, 0x42];
296            let custom_entry = Entry::new(custom_type, vec![1, 2, 3, 4]);
297            writer.write_entry(&custom_entry)?;
298
299            // Write another entry
300            let another_custom = Entry::new([0x43, 0x43], vec![5, 6, 7, 8]);
301            writer.write_entry(&another_custom)?;
302
303            writer.flush()?;
304        }
305
306        let cursor = Cursor::new(&buffer);
307        let mut reader = E2StoreReader::new(cursor);
308
309        let version = reader.read_version()?;
310        assert!(version.is_some(), "Version entry should have been auto-added");
311
312        let entries = reader.entries()?;
313        assert_eq!(entries.len(), 3);
314        assert!(entries[0].is_version());
315        assert_eq!(entries[1].entry_type, [0x42, 0x42]);
316        assert_eq!(entries[1].data, vec![1, 2, 3, 4]);
317        assert_eq!(entries[2].entry_type, [0x43, 0x43]);
318        assert_eq!(entries[2].data, vec![5, 6, 7, 8]);
319
320        Ok(())
321    }
322
323    #[test]
324    fn test_writer_prevents_duplicate_versions() -> Result<(), E2sError> {
325        let mut buffer = Vec::new();
326
327        {
328            let mut writer = E2StoreWriter::new(&mut buffer);
329
330            // Call write_version multiple times, it should only write once
331            writer.write_version()?;
332            writer.write_version()?;
333            writer.write_version()?;
334
335            // Write an entry
336            let block_entry = Entry::new(SLOT_INDEX, create_slot_index_data(42, &[8192]));
337            writer.write_entry(&block_entry)?;
338
339            writer.flush()?;
340        }
341
342        // Verify only one version entry was written
343        let cursor = Cursor::new(&buffer);
344        let mut reader = E2StoreReader::new(cursor);
345
346        let entries = reader.entries()?;
347        assert_eq!(entries.len(), 2);
348        assert!(entries[0].is_version());
349        assert!(entries[1].is_slot_index());
350
351        Ok(())
352    }
353
354    #[test]
355    fn test_e2store_multiple_roundtrip_conversions() -> Result<(), E2sError> {
356        // Initial set of entries to test with varied types and sizes
357        let entry1 = Entry::new([0x01, 0x01], vec![1, 2, 3, 4, 5]);
358        let entry2 = Entry::new([0x02, 0x02], vec![10, 20, 30, 40, 50]);
359        let entry3 = Entry::new(SLOT_INDEX, create_slot_index_data(123, &[45678]));
360        let entry4 = Entry::new([0xFF, 0xFF], Vec::new());
361
362        println!("Initial entries count: 4");
363
364        // First write cycle : create initial buffer
365        let mut buffer1 = Vec::new();
366        {
367            let mut writer = E2StoreWriter::new(&mut buffer1);
368            writer.write_version()?;
369            writer.write_entry(&entry1)?;
370            writer.write_entry(&entry2)?;
371            writer.write_entry(&entry3)?;
372            writer.write_entry(&entry4)?;
373            writer.flush()?;
374        }
375
376        // First read cycle : read from initial buffer
377        let mut reader1 = E2StoreReader::new(Cursor::new(&buffer1));
378        let entries1 = reader1.entries()?;
379
380        println!("First read entries:");
381        for (i, entry) in entries1.iter().enumerate() {
382            println!("Entry {}: type {:?}, data len {}", i, entry.entry_type, entry.data.len());
383        }
384        println!("First read entries count: {}", entries1.len());
385
386        // Verify first read content
387        assert_eq!(entries1.len(), 5, "Should have 5 entries (version + 4 data)");
388        assert!(entries1[0].is_version());
389        assert_eq!(entries1[1].entry_type, [0x01, 0x01]);
390        assert_eq!(entries1[1].data, vec![1, 2, 3, 4, 5]);
391        assert_eq!(entries1[2].entry_type, [0x02, 0x02]);
392        assert_eq!(entries1[2].data, vec![10, 20, 30, 40, 50]);
393        assert!(entries1[3].is_slot_index());
394        assert_eq!(entries1[4].entry_type, [0xFF, 0xFF]);
395        assert_eq!(entries1[4].data.len(), 0);
396
397        // Second write cycle : write what we just read
398        let mut buffer2 = Vec::new();
399        {
400            let mut writer = E2StoreWriter::new(&mut buffer2);
401            // Only write version once
402            writer.write_version()?;
403
404            // Skip the first entry ie the version since we already wrote it
405            for entry in &entries1[1..] {
406                writer.write_entry(entry)?;
407            }
408            writer.flush()?;
409        }
410
411        // Second read cycle - read the second buffer
412        let mut reader2 = E2StoreReader::new(Cursor::new(&buffer2));
413        let entries2 = reader2.entries()?;
414
415        // Verify second read matches first read
416        assert_eq!(entries1.len(), entries2.len());
417        for i in 0..entries1.len() {
418            assert_eq!(entries1[i].entry_type, entries2[i].entry_type);
419            assert_eq!(entries1[i].data, entries2[i].data);
420        }
421
422        Ok(())
423    }
424}