reth_era/era1/
file.rs

1//! Represents a complete Era1 file
2//!
3//! The structure of an Era1 file follows the specification:
4//! `Version | block-tuple* | other-entries* | Accumulator | BlockIndex`
5//!
6//! See also <https://github.com/eth-clients/e2store-format-specs/blob/main/formats/era1.md>.
7
8use crate::{
9    common::file_ops::{EraFileFormat, FileReader, StreamReader, StreamWriter},
10    e2s::{
11        error::E2sError,
12        file::{E2StoreReader, E2StoreWriter},
13        types::{Entry, IndexEntry, Version},
14    },
15    era1::types::{
16        execution::{
17            Accumulator, BlockTuple, CompressedBody, CompressedHeader, CompressedReceipts,
18            TotalDifficulty, ACCUMULATOR, COMPRESSED_BODY, COMPRESSED_HEADER, COMPRESSED_RECEIPTS,
19            MAX_BLOCKS_PER_ERA1, TOTAL_DIFFICULTY,
20        },
21        group::{BlockIndex, Era1Group, Era1Id, BLOCK_INDEX},
22    },
23};
24use alloy_primitives::BlockNumber;
25use std::{
26    collections::VecDeque,
27    fs::File,
28    io::{Read, Seek, Write},
29};
30
31/// Era1 file interface
32#[derive(Debug)]
33pub struct Era1File {
34    /// Version record, must be the first record in the file
35    pub version: Version,
36
37    /// Main content group of the Era1 file
38    pub group: Era1Group,
39
40    /// File identifier
41    pub id: Era1Id,
42}
43
44impl EraFileFormat for Era1File {
45    type EraGroup = Era1Group;
46    type Id = Era1Id;
47
48    /// Create a new [`Era1File`]
49    fn new(group: Era1Group, id: Era1Id) -> Self {
50        Self { version: Version, group, id }
51    }
52
53    fn version(&self) -> &Version {
54        &self.version
55    }
56
57    fn group(&self) -> &Self::EraGroup {
58        &self.group
59    }
60
61    fn id(&self) -> &Self::Id {
62        &self.id
63    }
64}
65
66impl Era1File {
67    /// Get a block by its number, if present in this file
68    pub fn get_block_by_number(&self, number: BlockNumber) -> Option<&BlockTuple> {
69        let index = (number - self.group.block_index.starting_number()) as usize;
70        (index < self.group.blocks.len()).then(|| &self.group.blocks[index])
71    }
72
73    /// Get the range of block numbers contained in this file
74    pub fn block_range(&self) -> std::ops::RangeInclusive<BlockNumber> {
75        let start = self.group.block_index.starting_number();
76        let end = start + (self.group.blocks.len() as u64) - 1;
77        start..=end
78    }
79
80    /// Check if this file contains a specific block number
81    pub fn contains_block(&self, number: BlockNumber) -> bool {
82        self.block_range().contains(&number)
83    }
84}
85
86/// Reader for Era1 files that builds on top of [`E2StoreReader`]
87#[derive(Debug)]
88pub struct Era1Reader<R: Read> {
89    reader: E2StoreReader<R>,
90}
91
92/// An iterator of [`BlockTuple`] streaming from [`E2StoreReader`].
93#[derive(Debug)]
94pub struct BlockTupleIterator<R: Read> {
95    reader: E2StoreReader<R>,
96    headers: VecDeque<CompressedHeader>,
97    bodies: VecDeque<CompressedBody>,
98    receipts: VecDeque<CompressedReceipts>,
99    difficulties: VecDeque<TotalDifficulty>,
100    other_entries: Vec<Entry>,
101    accumulator: Option<Accumulator>,
102    block_index: Option<BlockIndex>,
103}
104
105impl<R: Read> BlockTupleIterator<R> {
106    fn new(reader: E2StoreReader<R>) -> Self {
107        Self {
108            reader,
109            headers: Default::default(),
110            bodies: Default::default(),
111            receipts: Default::default(),
112            difficulties: Default::default(),
113            other_entries: Default::default(),
114            accumulator: None,
115            block_index: None,
116        }
117    }
118}
119
120impl<R: Read + Seek> Iterator for BlockTupleIterator<R> {
121    type Item = Result<BlockTuple, E2sError>;
122
123    fn next(&mut self) -> Option<Self::Item> {
124        self.next_result().transpose()
125    }
126}
127
128impl<R: Read + Seek> BlockTupleIterator<R> {
129    fn next_result(&mut self) -> Result<Option<BlockTuple>, E2sError> {
130        loop {
131            let Some(entry) = self.reader.read_next_entry()? else {
132                return Ok(None);
133            };
134
135            match entry.entry_type {
136                COMPRESSED_HEADER => {
137                    self.headers.push_back(CompressedHeader::from_entry(&entry)?);
138                }
139                COMPRESSED_BODY => {
140                    self.bodies.push_back(CompressedBody::from_entry(&entry)?);
141                }
142                COMPRESSED_RECEIPTS => {
143                    self.receipts.push_back(CompressedReceipts::from_entry(&entry)?);
144                }
145                TOTAL_DIFFICULTY => {
146                    self.difficulties.push_back(TotalDifficulty::from_entry(&entry)?);
147                }
148                ACCUMULATOR => {
149                    if self.accumulator.is_some() {
150                        return Err(E2sError::Ssz("Multiple accumulator entries found".to_string()));
151                    }
152                    self.accumulator = Some(Accumulator::from_entry(&entry)?);
153                }
154                BLOCK_INDEX => {
155                    if self.block_index.is_some() {
156                        return Err(E2sError::Ssz("Multiple block index entries found".to_string()));
157                    }
158                    self.block_index = Some(BlockIndex::from_entry(&entry)?);
159                }
160                _ => {
161                    self.other_entries.push(entry);
162                }
163            }
164
165            if !self.headers.is_empty() &&
166                !self.bodies.is_empty() &&
167                !self.receipts.is_empty() &&
168                !self.difficulties.is_empty()
169            {
170                let header = self.headers.pop_front().unwrap();
171                let body = self.bodies.pop_front().unwrap();
172                let receipt = self.receipts.pop_front().unwrap();
173                let difficulty = self.difficulties.pop_front().unwrap();
174
175                return Ok(Some(BlockTuple::new(header, body, receipt, difficulty)));
176            }
177        }
178    }
179}
180
181impl<R: Read + Seek> StreamReader<R> for Era1Reader<R> {
182    type File = Era1File;
183    type Iterator = BlockTupleIterator<R>;
184
185    /// Create a new [`Era1Reader`]
186    fn new(reader: R) -> Self {
187        Self { reader: E2StoreReader::new(reader) }
188    }
189
190    /// Returns an iterator of [`BlockTuple`] streaming from `reader`.
191    fn iter(self) -> BlockTupleIterator<R> {
192        BlockTupleIterator::new(self.reader)
193    }
194
195    fn read(self, network_name: String) -> Result<Self::File, E2sError> {
196        self.read_and_assemble(network_name)
197    }
198}
199
200impl<R: Read + Seek> Era1Reader<R> {
201    /// Reads and parses an Era1 file from the underlying reader, assembling all components
202    /// into a complete [`Era1File`] with an [`Era1Id`] that includes the provided network name.
203    pub fn read_and_assemble(mut self, network_name: String) -> Result<Era1File, E2sError> {
204        // Validate version entry
205        let _version_entry = match self.reader.read_version()? {
206            Some(entry) if entry.is_version() => entry,
207            Some(_) => return Err(E2sError::Ssz("First entry is not a Version entry".to_string())),
208            None => return Err(E2sError::Ssz("Empty Era1 file".to_string())),
209        };
210
211        let mut iter = self.iter();
212        let blocks = (&mut iter).collect::<Result<Vec<_>, _>>()?;
213
214        let BlockTupleIterator {
215            headers,
216            bodies,
217            receipts,
218            difficulties,
219            other_entries,
220            accumulator,
221            block_index,
222            ..
223        } = iter;
224
225        // Ensure we have matching counts for block components
226        if headers.len() != bodies.len() ||
227            headers.len() != receipts.len() ||
228            headers.len() != difficulties.len()
229        {
230            return Err(E2sError::Ssz(format!(
231                "Mismatched block component counts: headers={}, bodies={}, receipts={}, difficulties={}",
232                headers.len(), bodies.len(), receipts.len(), difficulties.len()
233            )));
234        }
235
236        let accumulator = accumulator
237            .ok_or_else(|| E2sError::Ssz("Era1 file missing accumulator entry".to_string()))?;
238
239        let block_index = block_index
240            .ok_or_else(|| E2sError::Ssz("Era1 file missing block index entry".to_string()))?;
241
242        let mut group = Era1Group::new(blocks, accumulator, block_index.clone());
243
244        // Add other entries
245        for entry in other_entries {
246            group.add_entry(entry);
247        }
248
249        let id = Era1Id::new(
250            network_name,
251            block_index.starting_number(),
252            block_index.offsets().len() as u32,
253        );
254
255        Ok(Era1File::new(group, id))
256    }
257}
258
259impl FileReader for Era1Reader<File> {}
260
261/// Writer for Era1 files that builds on top of [`E2StoreWriter`]
262#[derive(Debug)]
263pub struct Era1Writer<W: Write> {
264    writer: E2StoreWriter<W>,
265    has_written_version: bool,
266    has_written_blocks: bool,
267    has_written_accumulator: bool,
268    has_written_block_index: bool,
269}
270
271impl<W: Write> StreamWriter<W> for Era1Writer<W> {
272    type File = Era1File;
273
274    /// Create a new [`Era1Writer`]
275    fn new(writer: W) -> Self {
276        Self {
277            writer: E2StoreWriter::new(writer),
278            has_written_version: false,
279            has_written_blocks: false,
280            has_written_accumulator: false,
281            has_written_block_index: false,
282        }
283    }
284
285    /// Write the version entry
286    fn write_version(&mut self) -> Result<(), E2sError> {
287        if self.has_written_version {
288            return Ok(());
289        }
290
291        self.writer.write_version()?;
292        self.has_written_version = true;
293        Ok(())
294    }
295
296    /// Write a complete [`Era1File`] to the underlying writer
297    fn write_file(&mut self, era1_file: &Era1File) -> Result<(), E2sError> {
298        // Write version
299        self.write_version()?;
300
301        // Ensure blocks are written before other entries
302        if era1_file.group.blocks.len() > MAX_BLOCKS_PER_ERA1 {
303            return Err(E2sError::Ssz("Era1 file cannot contain more than 8192 blocks".to_string()));
304        }
305
306        // Write all blocks
307        for block in &era1_file.group.blocks {
308            self.write_block(block)?;
309        }
310
311        // Write other entries
312        for entry in &era1_file.group.other_entries {
313            self.writer.write_entry(entry)?;
314        }
315
316        // Write accumulator
317        self.write_accumulator(&era1_file.group.accumulator)?;
318
319        // Write block index
320        self.write_block_index(&era1_file.group.block_index)?;
321
322        // Flush the writer
323        self.writer.flush()?;
324
325        Ok(())
326    }
327
328    /// Flush any buffered data to the underlying writer
329    fn flush(&mut self) -> Result<(), E2sError> {
330        self.writer.flush()
331    }
332}
333
334impl<W: Write> Era1Writer<W> {
335    /// Write a single block tuple
336    pub fn write_block(&mut self, block_tuple: &BlockTuple) -> Result<(), E2sError> {
337        if !self.has_written_version {
338            self.write_version()?;
339        }
340
341        if self.has_written_accumulator || self.has_written_block_index {
342            return Err(E2sError::Ssz(
343                "Cannot write blocks after accumulator or block index".to_string(),
344            ));
345        }
346
347        // Write header
348        let header_entry = block_tuple.header.to_entry();
349        self.writer.write_entry(&header_entry)?;
350
351        // Write body
352        let body_entry = block_tuple.body.to_entry();
353        self.writer.write_entry(&body_entry)?;
354
355        // Write receipts
356        let receipts_entry = block_tuple.receipts.to_entry();
357        self.writer.write_entry(&receipts_entry)?;
358
359        // Write difficulty
360        let difficulty_entry = block_tuple.total_difficulty.to_entry();
361        self.writer.write_entry(&difficulty_entry)?;
362
363        self.has_written_blocks = true;
364
365        Ok(())
366    }
367
368    /// Write the block index
369    pub fn write_block_index(&mut self, block_index: &BlockIndex) -> Result<(), E2sError> {
370        if !self.has_written_version {
371            self.write_version()?;
372        }
373
374        if self.has_written_block_index {
375            return Err(E2sError::Ssz("Block index already written".to_string()));
376        }
377
378        let block_index_entry = block_index.to_entry();
379        self.writer.write_entry(&block_index_entry)?;
380        self.has_written_block_index = true;
381
382        Ok(())
383    }
384
385    /// Write the accumulator
386    pub fn write_accumulator(&mut self, accumulator: &Accumulator) -> Result<(), E2sError> {
387        if !self.has_written_version {
388            self.write_version()?;
389        }
390
391        if self.has_written_accumulator {
392            return Err(E2sError::Ssz("Accumulator already written".to_string()));
393        }
394
395        if self.has_written_block_index {
396            return Err(E2sError::Ssz("Cannot write accumulator after block index".to_string()));
397        }
398
399        let accumulator_entry = accumulator.to_entry();
400        self.writer.write_entry(&accumulator_entry)?;
401        self.has_written_accumulator = true;
402        Ok(())
403    }
404}
405
406#[cfg(test)]
407mod tests {
408    use super::*;
409    use crate::common::file_ops::FileWriter;
410    use alloy_primitives::{B256, U256};
411    use std::io::Cursor;
412    use tempfile::tempdir;
413
414    // Helper to create a sample block tuple for testing
415    fn create_test_block(number: BlockNumber, data_size: usize) -> BlockTuple {
416        let header_data = vec![(number % 256) as u8; data_size];
417        let header = CompressedHeader::new(header_data);
418
419        let body_data = vec![((number + 1) % 256) as u8; data_size * 2];
420        let body = CompressedBody::new(body_data);
421
422        let receipts_data = vec![((number + 2) % 256) as u8; data_size];
423        let receipts = CompressedReceipts::new(receipts_data);
424
425        let difficulty = TotalDifficulty::new(U256::from(number * 1000));
426
427        BlockTuple::new(header, body, receipts, difficulty)
428    }
429
430    // Helper to create a sample Era1File for testing
431    fn create_test_era1_file(
432        start_block: BlockNumber,
433        block_count: usize,
434        network: &str,
435    ) -> Era1File {
436        // Create blocks
437        let mut blocks = Vec::with_capacity(block_count);
438        for i in 0..block_count {
439            let block_num = start_block + i as u64;
440            blocks.push(create_test_block(block_num, 32));
441        }
442
443        let accumulator = Accumulator::new(B256::from([0xAA; 32]));
444
445        let mut offsets = Vec::with_capacity(block_count);
446        for i in 0..block_count {
447            offsets.push(i as i64 * 100);
448        }
449        let block_index = BlockIndex::new(start_block, offsets);
450        let group = Era1Group::new(blocks, accumulator, block_index);
451        let id = Era1Id::new(network, start_block, block_count as u32);
452
453        Era1File::new(group, id)
454    }
455
456    #[test]
457    fn test_era1_roundtrip_memory() -> Result<(), E2sError> {
458        // Create a test Era1File
459        let start_block = 1000;
460        let era1_file = create_test_era1_file(1000, 5, "testnet");
461
462        // Write to memory buffer
463        let mut buffer = Vec::new();
464        {
465            let mut writer = Era1Writer::new(&mut buffer);
466            writer.write_file(&era1_file)?;
467        }
468
469        // Read back from memory buffer
470        let reader = Era1Reader::new(Cursor::new(&buffer));
471        let read_era1 = reader.read("testnet".to_string())?;
472
473        // Verify core properties
474        assert_eq!(read_era1.id.network_name, "testnet");
475        assert_eq!(read_era1.id.start_block, 1000);
476        assert_eq!(read_era1.id.block_count, 5);
477        assert_eq!(read_era1.group.blocks.len(), 5);
478
479        // Verify block properties
480        assert_eq!(read_era1.group.blocks[0].total_difficulty.value, U256::from(1000 * 1000));
481        assert_eq!(read_era1.group.blocks[1].total_difficulty.value, U256::from(1001 * 1000));
482
483        // Verify block data
484        assert_eq!(read_era1.group.blocks[0].header.data, vec![(start_block % 256) as u8; 32]);
485        assert_eq!(read_era1.group.blocks[0].body.data, vec![((start_block + 1) % 256) as u8; 64]);
486        assert_eq!(
487            read_era1.group.blocks[0].receipts.data,
488            vec![((start_block + 2) % 256) as u8; 32]
489        );
490
491        // Verify block access methods
492        assert!(read_era1.contains_block(1000));
493        assert!(read_era1.contains_block(1004));
494        assert!(!read_era1.contains_block(999));
495        assert!(!read_era1.contains_block(1005));
496
497        let block_1002 = read_era1.get_block_by_number(1002);
498        assert!(block_1002.is_some());
499        assert_eq!(block_1002.unwrap().header.data, vec![((start_block + 2) % 256) as u8; 32]);
500
501        Ok(())
502    }
503
504    #[test]
505    fn test_era1_roundtrip_file() -> Result<(), E2sError> {
506        // Create a temporary directory
507        let temp_dir = tempdir().expect("Failed to create temp directory");
508        let file_path = temp_dir.path().join("test_roundtrip.era1");
509
510        // Create and write `Era1File` to disk
511        let era1_file = create_test_era1_file(2000, 3, "mainnet");
512        Era1Writer::create(&file_path, &era1_file)?;
513
514        // Read it back
515        let read_era1 = Era1Reader::open(&file_path, "mainnet")?;
516
517        // Verify core properties
518        assert_eq!(read_era1.id.network_name, "mainnet");
519        assert_eq!(read_era1.id.start_block, 2000);
520        assert_eq!(read_era1.id.block_count, 3);
521        assert_eq!(read_era1.group.blocks.len(), 3);
522
523        // Verify blocks
524        for i in 0..3 {
525            let block_num = 2000 + i as u64;
526            let block = read_era1.get_block_by_number(block_num);
527            assert!(block.is_some());
528            assert_eq!(block.unwrap().header.data, vec![block_num as u8; 32]);
529        }
530
531        Ok(())
532    }
533}