reth_era/e2s/
file.rs

1//! `E2Store` file reader
2//!
3//! See also <https://github.com/status-im/nimbus-eth2/blob/stable/docs/e2store.md>
4
5use crate::e2s::{
6    error::E2sError,
7    types::{Entry, Version},
8};
9use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
10
11/// A reader for `E2Store` files that wraps a [`BufReader`].
12
13#[derive(Debug)]
14pub struct E2StoreReader<R: Read> {
15    /// Buffered reader
16    reader: BufReader<R>,
17}
18
19impl<R: Read + Seek> E2StoreReader<R> {
20    /// Create a new [`E2StoreReader`]
21    pub fn new(reader: R) -> Self {
22        Self { reader: BufReader::new(reader) }
23    }
24
25    /// Read and validate the version record
26    pub fn read_version(&mut self) -> Result<Option<Entry>, E2sError> {
27        // Reset reader to beginning
28        self.reader.seek(SeekFrom::Start(0))?;
29
30        match Entry::read(&mut self.reader)? {
31            Some(entry) if entry.is_version() => Ok(Some(entry)),
32            Some(_) => Err(E2sError::Ssz("First entry must be a Version entry".to_string())),
33            None => Ok(None),
34        }
35    }
36
37    /// Read the next entry from the file
38    pub fn read_next_entry(&mut self) -> Result<Option<Entry>, E2sError> {
39        Entry::read(&mut self.reader)
40    }
41
42    /// Read all entries from the file, including the version entry
43    pub fn entries(&mut self) -> Result<Vec<Entry>, E2sError> {
44        // Reset reader to beginning
45        self.reader.seek(SeekFrom::Start(0))?;
46
47        let mut entries = Vec::new();
48
49        while let Some(entry) = self.read_next_entry()? {
50            entries.push(entry);
51        }
52
53        Ok(entries)
54    }
55}
56
57/// A writer for `E2Store` files that wraps a [`BufWriter`].
58#[derive(Debug)]
59pub struct E2StoreWriter<W: Write> {
60    /// Buffered writer
61    writer: BufWriter<W>,
62    /// Tracks whether this writer has written a version entry
63    has_written_version: bool,
64}
65
66impl<W: Write> E2StoreWriter<W> {
67    /// Create a new [`E2StoreWriter`]
68    pub fn new(writer: W) -> Self {
69        Self { writer: BufWriter::new(writer), has_written_version: false }
70    }
71
72    /// Create a new [`E2StoreWriter`] and write the version entry
73    pub fn with_version(writer: W) -> Result<Self, E2sError> {
74        let mut writer = Self::new(writer);
75        writer.write_version()?;
76        Ok(writer)
77    }
78
79    /// Write the version entry as the first entry in the file.
80    /// If not called explicitly, it will be written automatically before the first non-version
81    /// entry.
82    pub fn write_version(&mut self) -> Result<(), E2sError> {
83        if self.has_written_version {
84            return Ok(());
85        }
86
87        let version = Version;
88        version.encode(&mut self.writer)?;
89        self.has_written_version = true;
90        Ok(())
91    }
92
93    /// Write an entry to the file.
94    /// If a version entry has not been written yet, it will be added.
95    pub fn write_entry(&mut self, entry: &Entry) -> Result<(), E2sError> {
96        if !self.has_written_version {
97            self.write_version()?;
98        }
99
100        entry.write(&mut self.writer)?;
101        Ok(())
102    }
103
104    /// Flush any buffered data to the underlying writer
105    pub fn flush(&mut self) -> Result<(), E2sError> {
106        self.writer.flush().map_err(E2sError::Io)
107    }
108}
109
110#[cfg(test)]
111mod tests {
112    use super::*;
113    use crate::e2s::types::{SLOT_INDEX, VERSION};
114    use std::io::Cursor;
115
116    fn create_slot_index_data(starting_slot: u64, offsets: &[i64]) -> Vec<u8> {
117        // Format: starting-slot | index | index | index ... | count
118        let mut data = Vec::with_capacity(8 + offsets.len() * 8 + 8);
119
120        // Add starting slot
121        data.extend_from_slice(&starting_slot.to_le_bytes());
122
123        // Add all offsets
124        for offset in offsets {
125            data.extend_from_slice(&offset.to_le_bytes());
126        }
127
128        // Add count
129        data.extend_from_slice(&(offsets.len() as i64).to_le_bytes());
130
131        data
132    }
133
134    #[test]
135    fn test_e2store_reader() -> Result<(), E2sError> {
136        // Create a mock e2store file in memory
137        let mut mock_file = Vec::new();
138
139        let version_entry = Entry::new(VERSION, Vec::new());
140        version_entry.write(&mut mock_file)?;
141
142        let slot_index_entry1 = Entry::new(SLOT_INDEX, create_slot_index_data(1, &[1024]));
143        slot_index_entry1.write(&mut mock_file)?;
144
145        let slot_index_entry2 = Entry::new(SLOT_INDEX, create_slot_index_data(2, &[2048]));
146        slot_index_entry2.write(&mut mock_file)?;
147
148        let custom_type = [0x99, 0x99];
149        let custom_entry = Entry::new(custom_type, vec![10, 11, 12]);
150        custom_entry.write(&mut mock_file)?;
151
152        let cursor = Cursor::new(mock_file);
153        let mut e2store_reader = E2StoreReader::new(cursor);
154
155        let version = e2store_reader.read_version()?;
156        assert!(version.is_some());
157
158        let entries = e2store_reader.entries()?;
159
160        // Validate entries
161        assert_eq!(entries.len(), 4);
162        // First entry should be version
163        assert!(entries[0].is_version());
164        // Second entry should be slot index
165        assert!(entries[1].is_slot_index());
166        // Third entry is slot index
167        assert!(entries[2].is_slot_index());
168        // Fourth entry is custom type
169        assert!(entries[3].entry_type == [0x99, 0x99]);
170
171        Ok(())
172    }
173
174    #[test]
175    fn test_slot_index_with_multiple_offsets() -> Result<(), E2sError> {
176        let starting_slot = 100;
177        let offsets = &[1024, 2048, 0, 0, 3072, 0, 4096];
178
179        let slot_index_data = create_slot_index_data(starting_slot, offsets);
180
181        let slot_index_entry = Entry::new(SLOT_INDEX, slot_index_data.clone());
182
183        // Verify the slot index data format
184        assert_eq!(slot_index_data.len(), 8 + offsets.len() * 8 + 8);
185
186        // Check the starting slot
187        let mut starting_slot_bytes = [0u8; 8];
188        starting_slot_bytes.copy_from_slice(&slot_index_data[0..8]);
189        assert_eq!(u64::from_le_bytes(starting_slot_bytes), starting_slot);
190
191        // Check the count at the end
192        let mut count_bytes = [0u8; 8];
193        count_bytes.copy_from_slice(&slot_index_data[slot_index_data.len() - 8..]);
194        assert_eq!(i64::from_le_bytes(count_bytes), offsets.len() as i64);
195
196        // Verify we can write and read it back
197        let mut buffer = Vec::new();
198        slot_index_entry.write(&mut buffer)?;
199
200        let cursor = Cursor::new(buffer);
201        let mut reader = E2StoreReader::new(cursor);
202
203        let read_entry = reader.read_next_entry()?.unwrap();
204        assert!(read_entry.is_slot_index());
205
206        assert_eq!(read_entry.data.len(), slot_index_data.len());
207
208        Ok(())
209    }
210
211    #[test]
212    fn test_empty_file() -> Result<(), E2sError> {
213        // Create an empty file
214        let mock_file = Vec::new();
215
216        // Create reader
217        let cursor = Cursor::new(mock_file);
218        let mut e2store_reader = E2StoreReader::new(cursor);
219
220        // Reading version should return None
221        let version = e2store_reader.read_version()?;
222        assert!(version.is_none());
223
224        // Entries should be empty
225        let entries = e2store_reader.entries()?;
226        assert!(entries.is_empty());
227
228        Ok(())
229    }
230
231    #[test]
232    fn test_read_next_entry() -> Result<(), E2sError> {
233        let mut mock_file = Vec::new();
234
235        let version_entry = Entry::new(VERSION, Vec::new());
236        version_entry.write(&mut mock_file)?;
237
238        let slot_entry = Entry::new(SLOT_INDEX, create_slot_index_data(1, &[1024]));
239        slot_entry.write(&mut mock_file)?;
240
241        let cursor = Cursor::new(mock_file);
242        let mut reader = E2StoreReader::new(cursor);
243
244        let first = reader.read_next_entry()?.unwrap();
245        assert!(first.is_version());
246
247        let second = reader.read_next_entry()?.unwrap();
248        assert!(second.is_slot_index());
249
250        let third = reader.read_next_entry()?;
251        assert!(third.is_none());
252
253        Ok(())
254    }
255
256    #[test]
257    fn test_e2store_writer() -> Result<(), E2sError> {
258        let mut buffer = Vec::new();
259
260        {
261            let mut writer = E2StoreWriter::new(&mut buffer);
262
263            // Write version entry
264            writer.write_version()?;
265
266            // Write a block index entry
267            let block_entry = Entry::new(SLOT_INDEX, create_slot_index_data(1, &[1024]));
268            writer.write_entry(&block_entry)?;
269
270            // Write a custom entry
271            let custom_type = [0x99, 0x99];
272            let custom_entry = Entry::new(custom_type, vec![10, 11, 12]);
273            writer.write_entry(&custom_entry)?;
274
275            writer.flush()?;
276        }
277
278        let cursor = Cursor::new(&buffer);
279        let mut reader = E2StoreReader::new(cursor);
280
281        let entries = reader.entries()?;
282        assert_eq!(entries.len(), 3);
283        assert!(entries[0].is_version());
284        assert!(entries[1].is_slot_index());
285        assert_eq!(entries[2].entry_type, [0x99, 0x99]);
286
287        Ok(())
288    }
289
290    #[test]
291    fn test_writer_implicit_version_insertion() -> Result<(), E2sError> {
292        let mut buffer = Vec::new();
293
294        {
295            // Writer without explicitly writing the version
296            let mut writer = E2StoreWriter::new(&mut buffer);
297
298            // Write an entry, it should automatically add a version first
299            let custom_type = [0x42, 0x42];
300            let custom_entry = Entry::new(custom_type, vec![1, 2, 3, 4]);
301            writer.write_entry(&custom_entry)?;
302
303            // Write another entry
304            let another_custom = Entry::new([0x43, 0x43], vec![5, 6, 7, 8]);
305            writer.write_entry(&another_custom)?;
306
307            writer.flush()?;
308        }
309
310        let cursor = Cursor::new(&buffer);
311        let mut reader = E2StoreReader::new(cursor);
312
313        let version = reader.read_version()?;
314        assert!(version.is_some(), "Version entry should have been auto-added");
315
316        let entries = reader.entries()?;
317        assert_eq!(entries.len(), 3);
318        assert!(entries[0].is_version());
319        assert_eq!(entries[1].entry_type, [0x42, 0x42]);
320        assert_eq!(entries[1].data, vec![1, 2, 3, 4]);
321        assert_eq!(entries[2].entry_type, [0x43, 0x43]);
322        assert_eq!(entries[2].data, vec![5, 6, 7, 8]);
323
324        Ok(())
325    }
326
327    #[test]
328    fn test_writer_prevents_duplicate_versions() -> Result<(), E2sError> {
329        let mut buffer = Vec::new();
330
331        {
332            let mut writer = E2StoreWriter::new(&mut buffer);
333
334            // Call write_version multiple times, it should only write once
335            writer.write_version()?;
336            writer.write_version()?;
337            writer.write_version()?;
338
339            // Write an entry
340            let block_entry = Entry::new(SLOT_INDEX, create_slot_index_data(42, &[8192]));
341            writer.write_entry(&block_entry)?;
342
343            writer.flush()?;
344        }
345
346        // Verify only one version entry was written
347        let cursor = Cursor::new(&buffer);
348        let mut reader = E2StoreReader::new(cursor);
349
350        let entries = reader.entries()?;
351        assert_eq!(entries.len(), 2);
352        assert!(entries[0].is_version());
353        assert!(entries[1].is_slot_index());
354
355        Ok(())
356    }
357
358    #[test]
359    fn test_e2store_multiple_roundtrip_conversions() -> Result<(), E2sError> {
360        // Initial set of entries to test with varied types and sizes
361        let entry1 = Entry::new([0x01, 0x01], vec![1, 2, 3, 4, 5]);
362        let entry2 = Entry::new([0x02, 0x02], vec![10, 20, 30, 40, 50]);
363        let entry3 = Entry::new(SLOT_INDEX, create_slot_index_data(123, &[45678]));
364        let entry4 = Entry::new([0xFF, 0xFF], Vec::new());
365
366        println!("Initial entries count: 4");
367
368        // First write cycle : create initial buffer
369        let mut buffer1 = Vec::new();
370        {
371            let mut writer = E2StoreWriter::new(&mut buffer1);
372            writer.write_version()?;
373            writer.write_entry(&entry1)?;
374            writer.write_entry(&entry2)?;
375            writer.write_entry(&entry3)?;
376            writer.write_entry(&entry4)?;
377            writer.flush()?;
378        }
379
380        // First read cycle : read from initial buffer
381        let mut reader1 = E2StoreReader::new(Cursor::new(&buffer1));
382        let entries1 = reader1.entries()?;
383
384        println!("First read entries:");
385        for (i, entry) in entries1.iter().enumerate() {
386            println!("Entry {}: type {:?}, data len {}", i, entry.entry_type, entry.data.len());
387        }
388        println!("First read entries count: {}", entries1.len());
389
390        // Verify first read content
391        assert_eq!(entries1.len(), 5, "Should have 5 entries (version + 4 data)");
392        assert!(entries1[0].is_version());
393        assert_eq!(entries1[1].entry_type, [0x01, 0x01]);
394        assert_eq!(entries1[1].data, vec![1, 2, 3, 4, 5]);
395        assert_eq!(entries1[2].entry_type, [0x02, 0x02]);
396        assert_eq!(entries1[2].data, vec![10, 20, 30, 40, 50]);
397        assert!(entries1[3].is_slot_index());
398        assert_eq!(entries1[4].entry_type, [0xFF, 0xFF]);
399        assert_eq!(entries1[4].data.len(), 0);
400
401        // Second write cycle : write what we just read
402        let mut buffer2 = Vec::new();
403        {
404            let mut writer = E2StoreWriter::new(&mut buffer2);
405            // Only write version once
406            writer.write_version()?;
407
408            // Skip the first entry ie the version since we already wrote it
409            for entry in &entries1[1..] {
410                writer.write_entry(entry)?;
411            }
412            writer.flush()?;
413        }
414
415        // Second read cycle - read the second buffer
416        let mut reader2 = E2StoreReader::new(Cursor::new(&buffer2));
417        let entries2 = reader2.entries()?;
418
419        // Verify second read matches first read
420        assert_eq!(entries1.len(), entries2.len());
421        for i in 0..entries1.len() {
422            assert_eq!(entries1[i].entry_type, entries2[i].entry_type);
423            assert_eq!(entries1[i].data, entries2[i].data);
424        }
425
426        Ok(())
427    }
428}