reth_era/
e2s_file.rs

1//! `E2Store` file reader
2//!
3//! See also <https://github.com/status-im/nimbus-eth2/blob/stable/docs/e2store.md>
4
5use crate::e2s_types::{E2sError, Entry, Version};
6use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
7
8/// A reader for `E2Store` files that wraps a [`BufReader`].
9
10#[derive(Debug)]
11pub struct E2StoreReader<R: Read> {
12    /// Buffered reader
13    reader: BufReader<R>,
14}
15
16impl<R: Read + Seek> E2StoreReader<R> {
17    /// Create a new [`E2StoreReader`]
18    pub fn new(reader: R) -> Self {
19        Self { reader: BufReader::new(reader) }
20    }
21
22    /// Read and validate the version record
23    pub fn read_version(&mut self) -> Result<Option<Entry>, E2sError> {
24        // Reset reader to beginning
25        self.reader.seek(SeekFrom::Start(0))?;
26
27        match Entry::read(&mut self.reader)? {
28            Some(entry) if entry.is_version() => Ok(Some(entry)),
29            Some(_) => Err(E2sError::Ssz("First entry must be a Version entry".to_string())),
30            None => Ok(None),
31        }
32    }
33
34    /// Read the next entry from the file
35    pub fn read_next_entry(&mut self) -> Result<Option<Entry>, E2sError> {
36        Entry::read(&mut self.reader)
37    }
38
39    /// Read all entries from the file, including the version entry
40    pub fn entries(&mut self) -> Result<Vec<Entry>, E2sError> {
41        // Reset reader to beginning
42        self.reader.seek(SeekFrom::Start(0))?;
43
44        let mut entries = Vec::new();
45
46        while let Some(entry) = self.read_next_entry()? {
47            entries.push(entry);
48        }
49
50        Ok(entries)
51    }
52}
53
54/// A writer for `E2Store` files that wraps a [`BufWriter`].
55#[derive(Debug)]
56pub struct E2StoreWriter<W: Write> {
57    /// Buffered writer
58    writer: BufWriter<W>,
59    /// Tracks whether this writer has written a version entry
60    has_written_version: bool,
61}
62
63impl<W: Write> E2StoreWriter<W> {
64    /// Create a new [`E2StoreWriter`]
65    pub fn new(writer: W) -> Self {
66        Self { writer: BufWriter::new(writer), has_written_version: false }
67    }
68
69    /// Create a new [`E2StoreWriter`] and write the version entry
70    pub fn with_version(writer: W) -> Result<Self, E2sError> {
71        let mut writer = Self::new(writer);
72        writer.write_version()?;
73        Ok(writer)
74    }
75
76    /// Write the version entry as the first entry in the file.
77    /// If not called explicitly, it will be written automatically before the first non-version
78    /// entry.
79    pub fn write_version(&mut self) -> Result<(), E2sError> {
80        if self.has_written_version {
81            return Ok(());
82        }
83
84        let version = Version;
85        version.encode(&mut self.writer)?;
86        self.has_written_version = true;
87        Ok(())
88    }
89
90    /// Write an entry to the file.
91    /// If a version entry has not been written yet, it will be added.
92    pub fn write_entry(&mut self, entry: &Entry) -> Result<(), E2sError> {
93        if !self.has_written_version {
94            self.write_version()?;
95        }
96
97        entry.write(&mut self.writer)?;
98        Ok(())
99    }
100
101    /// Flush any buffered data to the underlying writer
102    pub fn flush(&mut self) -> Result<(), E2sError> {
103        self.writer.flush().map_err(E2sError::Io)
104    }
105}
106
107#[cfg(test)]
108mod tests {
109    use super::*;
110    use crate::e2s_types::{SLOT_INDEX, VERSION};
111    use std::io::Cursor;
112
113    fn create_slot_index_data(starting_slot: u64, offsets: &[i64]) -> Vec<u8> {
114        // Format: starting-slot | index | index | index ... | count
115        let mut data = Vec::with_capacity(8 + offsets.len() * 8 + 8);
116
117        // Add starting slot
118        data.extend_from_slice(&starting_slot.to_le_bytes());
119
120        // Add all offsets
121        for offset in offsets {
122            data.extend_from_slice(&offset.to_le_bytes());
123        }
124
125        // Add count
126        data.extend_from_slice(&(offsets.len() as i64).to_le_bytes());
127
128        data
129    }
130
131    #[test]
132    fn test_e2store_reader() -> Result<(), E2sError> {
133        // Create a mock e2store file in memory
134        let mut mock_file = Vec::new();
135
136        let version_entry = Entry::new(VERSION, Vec::new());
137        version_entry.write(&mut mock_file)?;
138
139        let slot_index_entry1 = Entry::new(SLOT_INDEX, create_slot_index_data(1, &[1024]));
140        slot_index_entry1.write(&mut mock_file)?;
141
142        let slot_index_entry2 = Entry::new(SLOT_INDEX, create_slot_index_data(2, &[2048]));
143        slot_index_entry2.write(&mut mock_file)?;
144
145        let custom_type = [0x99, 0x99];
146        let custom_entry = Entry::new(custom_type, vec![10, 11, 12]);
147        custom_entry.write(&mut mock_file)?;
148
149        let cursor = Cursor::new(mock_file);
150        let mut e2store_reader = E2StoreReader::new(cursor);
151
152        let version = e2store_reader.read_version()?;
153        assert!(version.is_some());
154
155        let entries = e2store_reader.entries()?;
156
157        // Validate entries
158        assert_eq!(entries.len(), 4);
159        // First entry should be version
160        assert!(entries[0].is_version());
161        // Second entry should be slot index
162        assert!(entries[1].is_slot_index());
163        // Third entry is slot index
164        assert!(entries[2].is_slot_index());
165        // Fourth entry is custom type
166        assert!(entries[3].entry_type == [0x99, 0x99]);
167
168        Ok(())
169    }
170
171    #[test]
172    fn test_slot_index_with_multiple_offsets() -> Result<(), E2sError> {
173        let starting_slot = 100;
174        let offsets = &[1024, 2048, 0, 0, 3072, 0, 4096];
175
176        let slot_index_data = create_slot_index_data(starting_slot, offsets);
177
178        let slot_index_entry = Entry::new(SLOT_INDEX, slot_index_data.clone());
179
180        // Verify the slot index data format
181        assert_eq!(slot_index_data.len(), 8 + offsets.len() * 8 + 8);
182
183        // Check the starting slot
184        let mut starting_slot_bytes = [0u8; 8];
185        starting_slot_bytes.copy_from_slice(&slot_index_data[0..8]);
186        assert_eq!(u64::from_le_bytes(starting_slot_bytes), starting_slot);
187
188        // Check the count at the end
189        let mut count_bytes = [0u8; 8];
190        count_bytes.copy_from_slice(&slot_index_data[slot_index_data.len() - 8..]);
191        assert_eq!(i64::from_le_bytes(count_bytes), offsets.len() as i64);
192
193        // Verify we can write and read it back
194        let mut buffer = Vec::new();
195        slot_index_entry.write(&mut buffer)?;
196
197        let cursor = Cursor::new(buffer);
198        let mut reader = E2StoreReader::new(cursor);
199
200        let read_entry = reader.read_next_entry()?.unwrap();
201        assert!(read_entry.is_slot_index());
202
203        assert_eq!(read_entry.data.len(), slot_index_data.len());
204
205        Ok(())
206    }
207
208    #[test]
209    fn test_empty_file() -> Result<(), E2sError> {
210        // Create an empty file
211        let mock_file = Vec::new();
212
213        // Create reader
214        let cursor = Cursor::new(mock_file);
215        let mut e2store_reader = E2StoreReader::new(cursor);
216
217        // Reading version should return None
218        let version = e2store_reader.read_version()?;
219        assert!(version.is_none());
220
221        // Entries should be empty
222        let entries = e2store_reader.entries()?;
223        assert!(entries.is_empty());
224
225        Ok(())
226    }
227
228    #[test]
229    fn test_read_next_entry() -> Result<(), E2sError> {
230        let mut mock_file = Vec::new();
231
232        let version_entry = Entry::new(VERSION, Vec::new());
233        version_entry.write(&mut mock_file)?;
234
235        let slot_entry = Entry::new(SLOT_INDEX, create_slot_index_data(1, &[1024]));
236        slot_entry.write(&mut mock_file)?;
237
238        let cursor = Cursor::new(mock_file);
239        let mut reader = E2StoreReader::new(cursor);
240
241        let first = reader.read_next_entry()?.unwrap();
242        assert!(first.is_version());
243
244        let second = reader.read_next_entry()?.unwrap();
245        assert!(second.is_slot_index());
246
247        let third = reader.read_next_entry()?;
248        assert!(third.is_none());
249
250        Ok(())
251    }
252
253    #[test]
254    fn test_e2store_writer() -> Result<(), E2sError> {
255        let mut buffer = Vec::new();
256
257        {
258            let mut writer = E2StoreWriter::new(&mut buffer);
259
260            // Write version entry
261            writer.write_version()?;
262
263            // Write a block index entry
264            let block_entry = Entry::new(SLOT_INDEX, create_slot_index_data(1, &[1024]));
265            writer.write_entry(&block_entry)?;
266
267            // Write a custom entry
268            let custom_type = [0x99, 0x99];
269            let custom_entry = Entry::new(custom_type, vec![10, 11, 12]);
270            writer.write_entry(&custom_entry)?;
271
272            writer.flush()?;
273        }
274
275        let cursor = Cursor::new(&buffer);
276        let mut reader = E2StoreReader::new(cursor);
277
278        let entries = reader.entries()?;
279        assert_eq!(entries.len(), 3);
280        assert!(entries[0].is_version());
281        assert!(entries[1].is_slot_index());
282        assert_eq!(entries[2].entry_type, [0x99, 0x99]);
283
284        Ok(())
285    }
286
287    #[test]
288    fn test_writer_implicit_version_insertion() -> Result<(), E2sError> {
289        let mut buffer = Vec::new();
290
291        {
292            // Writer without explicitly writing the version
293            let mut writer = E2StoreWriter::new(&mut buffer);
294
295            // Write an entry, it should automatically add a version first
296            let custom_type = [0x42, 0x42];
297            let custom_entry = Entry::new(custom_type, vec![1, 2, 3, 4]);
298            writer.write_entry(&custom_entry)?;
299
300            // Write another entry
301            let another_custom = Entry::new([0x43, 0x43], vec![5, 6, 7, 8]);
302            writer.write_entry(&another_custom)?;
303
304            writer.flush()?;
305        }
306
307        let cursor = Cursor::new(&buffer);
308        let mut reader = E2StoreReader::new(cursor);
309
310        let version = reader.read_version()?;
311        assert!(version.is_some(), "Version entry should have been auto-added");
312
313        let entries = reader.entries()?;
314        assert_eq!(entries.len(), 3);
315        assert!(entries[0].is_version());
316        assert_eq!(entries[1].entry_type, [0x42, 0x42]);
317        assert_eq!(entries[1].data, vec![1, 2, 3, 4]);
318        assert_eq!(entries[2].entry_type, [0x43, 0x43]);
319        assert_eq!(entries[2].data, vec![5, 6, 7, 8]);
320
321        Ok(())
322    }
323
324    #[test]
325    fn test_writer_prevents_duplicate_versions() -> Result<(), E2sError> {
326        let mut buffer = Vec::new();
327
328        {
329            let mut writer = E2StoreWriter::new(&mut buffer);
330
331            // Call write_version multiple times, it should only write once
332            writer.write_version()?;
333            writer.write_version()?;
334            writer.write_version()?;
335
336            // Write an entry
337            let block_entry = Entry::new(SLOT_INDEX, create_slot_index_data(42, &[8192]));
338            writer.write_entry(&block_entry)?;
339
340            writer.flush()?;
341        }
342
343        // Verify only one version entry was written
344        let cursor = Cursor::new(&buffer);
345        let mut reader = E2StoreReader::new(cursor);
346
347        let entries = reader.entries()?;
348        assert_eq!(entries.len(), 2);
349        assert!(entries[0].is_version());
350        assert!(entries[1].is_slot_index());
351
352        Ok(())
353    }
354
355    #[test]
356    fn test_e2store_multiple_roundtrip_conversions() -> Result<(), E2sError> {
357        // Initial set of entries to test with varied types and sizes
358        let entry1 = Entry::new([0x01, 0x01], vec![1, 2, 3, 4, 5]);
359        let entry2 = Entry::new([0x02, 0x02], vec![10, 20, 30, 40, 50]);
360        let entry3 = Entry::new(SLOT_INDEX, create_slot_index_data(123, &[45678]));
361        let entry4 = Entry::new([0xFF, 0xFF], Vec::new());
362
363        println!("Initial entries count: 4");
364
365        // First write cycle : create initial buffer
366        let mut buffer1 = Vec::new();
367        {
368            let mut writer = E2StoreWriter::new(&mut buffer1);
369            writer.write_version()?;
370            writer.write_entry(&entry1)?;
371            writer.write_entry(&entry2)?;
372            writer.write_entry(&entry3)?;
373            writer.write_entry(&entry4)?;
374            writer.flush()?;
375        }
376
377        // First read cycle : read from initial buffer
378        let mut reader1 = E2StoreReader::new(Cursor::new(&buffer1));
379        let entries1 = reader1.entries()?;
380
381        println!("First read entries:");
382        for (i, entry) in entries1.iter().enumerate() {
383            println!("Entry {}: type {:?}, data len {}", i, entry.entry_type, entry.data.len());
384        }
385        println!("First read entries count: {}", entries1.len());
386
387        // Verify first read content
388        assert_eq!(entries1.len(), 5, "Should have 5 entries (version + 4 data)");
389        assert!(entries1[0].is_version());
390        assert_eq!(entries1[1].entry_type, [0x01, 0x01]);
391        assert_eq!(entries1[1].data, vec![1, 2, 3, 4, 5]);
392        assert_eq!(entries1[2].entry_type, [0x02, 0x02]);
393        assert_eq!(entries1[2].data, vec![10, 20, 30, 40, 50]);
394        assert!(entries1[3].is_slot_index());
395        assert_eq!(entries1[4].entry_type, [0xFF, 0xFF]);
396        assert_eq!(entries1[4].data.len(), 0);
397
398        // Second write cycle : write what we just read
399        let mut buffer2 = Vec::new();
400        {
401            let mut writer = E2StoreWriter::new(&mut buffer2);
402            // Only write version once
403            writer.write_version()?;
404
405            // Skip the first entry ie the version since we already wrote it
406            for entry in &entries1[1..] {
407                writer.write_entry(entry)?;
408            }
409            writer.flush()?;
410        }
411
412        // Second read cycle - read the second buffer
413        let mut reader2 = E2StoreReader::new(Cursor::new(&buffer2));
414        let entries2 = reader2.entries()?;
415
416        // Verify second read matches first read
417        assert_eq!(entries1.len(), entries2.len());
418        for i in 0..entries1.len() {
419            assert_eq!(entries1[i].entry_type, entries2[i].entry_type);
420            assert_eq!(entries1[i].data, entries2[i].data);
421        }
422
423        Ok(())
424    }
425}