Skip to main content

reth_era/era1/
file.rs

1//! Represents a complete Era1 file
2//!
3//! The structure of an Era1 file follows the specification:
4//! `Version | block-tuple* | other-entries* | Accumulator | BlockIndex`
5//!
6//! See also <https://github.com/eth-clients/e2store-format-specs/blob/main/formats/era1.md>.
7
8use crate::{
9    common::file_ops::{EraFileFormat, StreamReader, StreamWriter},
10    e2s::{
11        error::E2sError,
12        file::{E2StoreReader, E2StoreWriter},
13        types::{Entry, IndexEntry, Version},
14    },
15    era1::types::{
16        execution::{
17            Accumulator, BlockTuple, CompressedBody, CompressedHeader, CompressedReceipts,
18            TotalDifficulty, ACCUMULATOR, COMPRESSED_BODY, COMPRESSED_HEADER, COMPRESSED_RECEIPTS,
19            MAX_BLOCKS_PER_ERA1, TOTAL_DIFFICULTY,
20        },
21        group::{BlockIndex, Era1Group, Era1Id, BLOCK_INDEX},
22    },
23};
24use alloy_primitives::BlockNumber;
25use std::{
26    collections::VecDeque,
27    io::{Read, Seek, Write},
28};
29
30/// Era1 file interface
31#[derive(Debug)]
32pub struct Era1File {
33    /// Version record, must be the first record in the file
34    pub version: Version,
35
36    /// Main content group of the Era1 file
37    pub group: Era1Group,
38
39    /// File identifier
40    pub id: Era1Id,
41}
42
43impl EraFileFormat for Era1File {
44    type EraGroup = Era1Group;
45    type Id = Era1Id;
46
47    /// Create a new [`Era1File`]
48    fn new(group: Era1Group, id: Era1Id) -> Self {
49        Self { version: Version, group, id }
50    }
51
52    fn version(&self) -> &Version {
53        &self.version
54    }
55
56    fn group(&self) -> &Self::EraGroup {
57        &self.group
58    }
59
60    fn id(&self) -> &Self::Id {
61        &self.id
62    }
63}
64
65impl Era1File {
66    /// Get a block by its number, if present in this file
67    pub fn get_block_by_number(&self, number: BlockNumber) -> Option<&BlockTuple> {
68        let index = (number - self.group.block_index.starting_number()) as usize;
69        (index < self.group.blocks.len()).then(|| &self.group.blocks[index])
70    }
71
72    /// Get the range of block numbers contained in this file
73    pub fn block_range(&self) -> std::ops::RangeInclusive<BlockNumber> {
74        let start = self.group.block_index.starting_number();
75        let end = start + (self.group.blocks.len() as u64) - 1;
76        start..=end
77    }
78
79    /// Check if this file contains a specific block number
80    pub fn contains_block(&self, number: BlockNumber) -> bool {
81        self.block_range().contains(&number)
82    }
83}
84
85/// Reader for Era1 files that builds on top of [`E2StoreReader`]
86#[derive(Debug)]
87pub struct Era1Reader<R: Read> {
88    reader: E2StoreReader<R>,
89}
90
91/// An iterator of [`BlockTuple`] streaming from [`E2StoreReader`].
92#[derive(Debug)]
93pub struct BlockTupleIterator<R: Read> {
94    reader: E2StoreReader<R>,
95    headers: VecDeque<CompressedHeader>,
96    bodies: VecDeque<CompressedBody>,
97    receipts: VecDeque<CompressedReceipts>,
98    difficulties: VecDeque<TotalDifficulty>,
99    other_entries: Vec<Entry>,
100    accumulator: Option<Accumulator>,
101    block_index: Option<BlockIndex>,
102}
103
104impl<R: Read> BlockTupleIterator<R> {
105    fn new(reader: E2StoreReader<R>) -> Self {
106        Self {
107            reader,
108            headers: Default::default(),
109            bodies: Default::default(),
110            receipts: Default::default(),
111            difficulties: Default::default(),
112            other_entries: Default::default(),
113            accumulator: None,
114            block_index: None,
115        }
116    }
117}
118
119impl<R: Read + Seek> Iterator for BlockTupleIterator<R> {
120    type Item = Result<BlockTuple, E2sError>;
121
122    fn next(&mut self) -> Option<Self::Item> {
123        self.next_result().transpose()
124    }
125}
126
127impl<R: Read + Seek> BlockTupleIterator<R> {
128    fn next_result(&mut self) -> Result<Option<BlockTuple>, E2sError> {
129        loop {
130            let Some(entry) = self.reader.read_next_entry()? else {
131                return Ok(None);
132            };
133
134            match entry.entry_type {
135                COMPRESSED_HEADER => {
136                    self.headers.push_back(CompressedHeader::from_entry(&entry)?);
137                }
138                COMPRESSED_BODY => {
139                    self.bodies.push_back(CompressedBody::from_entry(&entry)?);
140                }
141                COMPRESSED_RECEIPTS => {
142                    self.receipts.push_back(CompressedReceipts::from_entry(&entry)?);
143                }
144                TOTAL_DIFFICULTY => {
145                    self.difficulties.push_back(TotalDifficulty::from_entry(&entry)?);
146                }
147                ACCUMULATOR => {
148                    if self.accumulator.is_some() {
149                        return Err(E2sError::Ssz("Multiple accumulator entries found".to_string()));
150                    }
151                    self.accumulator = Some(Accumulator::from_entry(&entry)?);
152                }
153                BLOCK_INDEX => {
154                    if self.block_index.is_some() {
155                        return Err(E2sError::Ssz("Multiple block index entries found".to_string()));
156                    }
157                    self.block_index = Some(BlockIndex::from_entry(&entry)?);
158                }
159                _ => {
160                    self.other_entries.push(entry);
161                }
162            }
163
164            if !self.headers.is_empty() &&
165                !self.bodies.is_empty() &&
166                !self.receipts.is_empty() &&
167                !self.difficulties.is_empty()
168            {
169                let header = self.headers.pop_front().unwrap();
170                let body = self.bodies.pop_front().unwrap();
171                let receipt = self.receipts.pop_front().unwrap();
172                let difficulty = self.difficulties.pop_front().unwrap();
173
174                return Ok(Some(BlockTuple::new(header, body, receipt, difficulty)));
175            }
176        }
177    }
178}
179
180impl<R: Read + Seek> StreamReader<R> for Era1Reader<R> {
181    type File = Era1File;
182    type Iterator = BlockTupleIterator<R>;
183
184    /// Create a new [`Era1Reader`]
185    fn new(reader: R) -> Self {
186        Self { reader: E2StoreReader::new(reader) }
187    }
188
189    /// Returns an iterator of [`BlockTuple`] streaming from `reader`.
190    fn iter(self) -> BlockTupleIterator<R> {
191        BlockTupleIterator::new(self.reader)
192    }
193
194    fn read(self, network_name: String) -> Result<Self::File, E2sError> {
195        self.read_and_assemble(network_name)
196    }
197}
198
199impl<R: Read + Seek> Era1Reader<R> {
200    /// Reads and parses an Era1 file from the underlying reader, assembling all components
201    /// into a complete [`Era1File`] with an [`Era1Id`] that includes the provided network name.
202    pub fn read_and_assemble(mut self, network_name: String) -> Result<Era1File, E2sError> {
203        // Validate version entry
204        let _version_entry = match self.reader.read_version()? {
205            Some(entry) if entry.is_version() => entry,
206            Some(_) => return Err(E2sError::Ssz("First entry is not a Version entry".to_string())),
207            None => return Err(E2sError::Ssz("Empty Era1 file".to_string())),
208        };
209
210        let mut iter = self.iter();
211        let blocks = (&mut iter).collect::<Result<Vec<_>, _>>()?;
212
213        let BlockTupleIterator {
214            headers,
215            bodies,
216            receipts,
217            difficulties,
218            other_entries,
219            accumulator,
220            block_index,
221            ..
222        } = iter;
223
224        // Ensure we have matching counts for block components
225        if headers.len() != bodies.len() ||
226            headers.len() != receipts.len() ||
227            headers.len() != difficulties.len()
228        {
229            return Err(E2sError::Ssz(format!(
230                "Mismatched block component counts: headers={}, bodies={}, receipts={}, difficulties={}",
231                headers.len(), bodies.len(), receipts.len(), difficulties.len()
232            )));
233        }
234
235        let accumulator = accumulator
236            .ok_or_else(|| E2sError::Ssz("Era1 file missing accumulator entry".to_string()))?;
237
238        let block_index = block_index
239            .ok_or_else(|| E2sError::Ssz("Era1 file missing block index entry".to_string()))?;
240
241        let mut group = Era1Group::new(blocks, accumulator, block_index.clone());
242
243        // Add other entries
244        for entry in other_entries {
245            group.add_entry(entry);
246        }
247
248        let id = Era1Id::new(
249            network_name,
250            block_index.starting_number(),
251            block_index.offsets().len() as u32,
252        );
253
254        Ok(Era1File::new(group, id))
255    }
256}
257
258/// Writer for Era1 files that builds on top of [`E2StoreWriter`]
259#[derive(Debug)]
260pub struct Era1Writer<W: Write> {
261    writer: E2StoreWriter<W>,
262    has_written_version: bool,
263    has_written_blocks: bool,
264    has_written_accumulator: bool,
265    has_written_block_index: bool,
266}
267
268impl<W: Write> StreamWriter<W> for Era1Writer<W> {
269    type File = Era1File;
270
271    /// Create a new [`Era1Writer`]
272    fn new(writer: W) -> Self {
273        Self {
274            writer: E2StoreWriter::new(writer),
275            has_written_version: false,
276            has_written_blocks: false,
277            has_written_accumulator: false,
278            has_written_block_index: false,
279        }
280    }
281
282    /// Write the version entry
283    fn write_version(&mut self) -> Result<(), E2sError> {
284        if self.has_written_version {
285            return Ok(());
286        }
287
288        self.writer.write_version()?;
289        self.has_written_version = true;
290        Ok(())
291    }
292
293    /// Write a complete [`Era1File`] to the underlying writer
294    fn write_file(&mut self, era1_file: &Era1File) -> Result<(), E2sError> {
295        // Write version
296        self.write_version()?;
297
298        // Ensure blocks are written before other entries
299        if era1_file.group.blocks.len() > MAX_BLOCKS_PER_ERA1 {
300            return Err(E2sError::Ssz("Era1 file cannot contain more than 8192 blocks".to_string()));
301        }
302
303        // Write all blocks
304        for block in &era1_file.group.blocks {
305            self.write_block(block)?;
306        }
307
308        // Write other entries
309        for entry in &era1_file.group.other_entries {
310            self.writer.write_entry(entry)?;
311        }
312
313        // Write accumulator
314        self.write_accumulator(&era1_file.group.accumulator)?;
315
316        // Write block index
317        self.write_block_index(&era1_file.group.block_index)?;
318
319        // Flush the writer
320        self.writer.flush()?;
321
322        Ok(())
323    }
324
325    /// Flush any buffered data to the underlying writer
326    fn flush(&mut self) -> Result<(), E2sError> {
327        self.writer.flush()
328    }
329}
330
331impl<W: Write> Era1Writer<W> {
332    /// Write a single block tuple
333    pub fn write_block(&mut self, block_tuple: &BlockTuple) -> Result<(), E2sError> {
334        if !self.has_written_version {
335            self.write_version()?;
336        }
337
338        if self.has_written_accumulator || self.has_written_block_index {
339            return Err(E2sError::Ssz(
340                "Cannot write blocks after accumulator or block index".to_string(),
341            ));
342        }
343
344        // Write header
345        let header_entry = block_tuple.header.to_entry();
346        self.writer.write_entry(&header_entry)?;
347
348        // Write body
349        let body_entry = block_tuple.body.to_entry();
350        self.writer.write_entry(&body_entry)?;
351
352        // Write receipts
353        let receipts_entry = block_tuple.receipts.to_entry();
354        self.writer.write_entry(&receipts_entry)?;
355
356        // Write difficulty
357        let difficulty_entry = block_tuple.total_difficulty.to_entry();
358        self.writer.write_entry(&difficulty_entry)?;
359
360        self.has_written_blocks = true;
361
362        Ok(())
363    }
364
365    /// Write the block index
366    pub fn write_block_index(&mut self, block_index: &BlockIndex) -> Result<(), E2sError> {
367        if !self.has_written_version {
368            self.write_version()?;
369        }
370
371        if self.has_written_block_index {
372            return Err(E2sError::Ssz("Block index already written".to_string()));
373        }
374
375        let block_index_entry = block_index.to_entry();
376        self.writer.write_entry(&block_index_entry)?;
377        self.has_written_block_index = true;
378
379        Ok(())
380    }
381
382    /// Write the accumulator
383    pub fn write_accumulator(&mut self, accumulator: &Accumulator) -> Result<(), E2sError> {
384        if !self.has_written_version {
385            self.write_version()?;
386        }
387
388        if self.has_written_accumulator {
389            return Err(E2sError::Ssz("Accumulator already written".to_string()));
390        }
391
392        if self.has_written_block_index {
393            return Err(E2sError::Ssz("Cannot write accumulator after block index".to_string()));
394        }
395
396        let accumulator_entry = accumulator.to_entry();
397        self.writer.write_entry(&accumulator_entry)?;
398        self.has_written_accumulator = true;
399        Ok(())
400    }
401}
402
403#[cfg(test)]
404mod tests {
405    use super::*;
406    use crate::common::file_ops::{FileReader, FileWriter};
407    use alloy_primitives::{B256, U256};
408    use std::io::Cursor;
409    use tempfile::tempdir;
410
411    // Helper to create a sample block tuple for testing
412    fn create_test_block(number: BlockNumber, data_size: usize) -> BlockTuple {
413        let header_data = vec![(number % 256) as u8; data_size];
414        let header = CompressedHeader::new(header_data);
415
416        let body_data = vec![((number + 1) % 256) as u8; data_size * 2];
417        let body = CompressedBody::new(body_data);
418
419        let receipts_data = vec![((number + 2) % 256) as u8; data_size];
420        let receipts = CompressedReceipts::new(receipts_data);
421
422        let difficulty = TotalDifficulty::new(U256::from(number * 1000));
423
424        BlockTuple::new(header, body, receipts, difficulty)
425    }
426
427    // Helper to create a sample Era1File for testing
428    fn create_test_era1_file(
429        start_block: BlockNumber,
430        block_count: usize,
431        network: &str,
432    ) -> Era1File {
433        // Create blocks
434        let mut blocks = Vec::with_capacity(block_count);
435        for i in 0..block_count {
436            let block_num = start_block + i as u64;
437            blocks.push(create_test_block(block_num, 32));
438        }
439
440        let accumulator = Accumulator::new(B256::from([0xAA; 32]));
441
442        let mut offsets = Vec::with_capacity(block_count);
443        for i in 0..block_count {
444            offsets.push(i as i64 * 100);
445        }
446        let block_index = BlockIndex::new(start_block, offsets);
447        let group = Era1Group::new(blocks, accumulator, block_index);
448        let id = Era1Id::new(network, start_block, block_count as u32);
449
450        Era1File::new(group, id)
451    }
452
453    #[test]
454    fn test_era1_roundtrip_memory() -> Result<(), E2sError> {
455        // Create a test Era1File
456        let start_block = 1000;
457        let era1_file = create_test_era1_file(1000, 5, "testnet");
458
459        // Write to memory buffer
460        let mut buffer = Vec::new();
461        {
462            let mut writer = Era1Writer::new(&mut buffer);
463            writer.write_file(&era1_file)?;
464        }
465
466        // Read back from memory buffer
467        let reader = Era1Reader::new(Cursor::new(&buffer));
468        let read_era1 = reader.read("testnet".to_string())?;
469
470        // Verify core properties
471        assert_eq!(read_era1.id.network_name, "testnet");
472        assert_eq!(read_era1.id.start_block, 1000);
473        assert_eq!(read_era1.id.block_count, 5);
474        assert_eq!(read_era1.group.blocks.len(), 5);
475
476        // Verify block properties
477        assert_eq!(read_era1.group.blocks[0].total_difficulty.value, U256::from(1000 * 1000));
478        assert_eq!(read_era1.group.blocks[1].total_difficulty.value, U256::from(1001 * 1000));
479
480        // Verify block data
481        assert_eq!(read_era1.group.blocks[0].header.data, vec![(start_block % 256) as u8; 32]);
482        assert_eq!(read_era1.group.blocks[0].body.data, vec![((start_block + 1) % 256) as u8; 64]);
483        assert_eq!(
484            read_era1.group.blocks[0].receipts.data,
485            vec![((start_block + 2) % 256) as u8; 32]
486        );
487
488        // Verify block access methods
489        assert!(read_era1.contains_block(1000));
490        assert!(read_era1.contains_block(1004));
491        assert!(!read_era1.contains_block(999));
492        assert!(!read_era1.contains_block(1005));
493
494        let block_1002 = read_era1.get_block_by_number(1002);
495        assert!(block_1002.is_some());
496        assert_eq!(block_1002.unwrap().header.data, vec![((start_block + 2) % 256) as u8; 32]);
497
498        Ok(())
499    }
500
501    #[test]
502    fn test_era1_roundtrip_file() -> Result<(), E2sError> {
503        // Create a temporary directory
504        let temp_dir = tempdir().expect("Failed to create temp directory");
505        let file_path = temp_dir.path().join("test_roundtrip.era1");
506
507        // Create and write `Era1File` to disk
508        let era1_file = create_test_era1_file(2000, 3, "mainnet");
509        Era1Writer::create(&file_path, &era1_file)?;
510
511        // Read it back
512        let read_era1 = Era1Reader::open(&file_path, "mainnet")?;
513
514        // Verify core properties
515        assert_eq!(read_era1.id.network_name, "mainnet");
516        assert_eq!(read_era1.id.start_block, 2000);
517        assert_eq!(read_era1.id.block_count, 3);
518        assert_eq!(read_era1.group.blocks.len(), 3);
519
520        // Verify blocks
521        for i in 0..3 {
522            let block_num = 2000 + i as u64;
523            let block = read_era1.get_block_by_number(block_num);
524            assert!(block.is_some());
525            assert_eq!(block.unwrap().header.data, vec![block_num as u8; 32]);
526        }
527
528        Ok(())
529    }
530}