reth_era/
era1_file.rs

1//! Represents a complete Era1 file
2//!
3//! The structure of an Era1 file follows the specification:
4//! `Version | block-tuple* | other-entries* | Accumulator | BlockIndex`
5//!
6//! See also <https://github.com/eth-clients/e2store-format-specs/blob/main/formats/era1.md>.
7
8use crate::{
9    e2s_file::{E2StoreReader, E2StoreWriter},
10    e2s_types::{E2sError, Entry, IndexEntry, Version},
11    era1_types::{BlockIndex, Era1Group, Era1Id, BLOCK_INDEX},
12    era_file_ops::{EraFileFormat, FileReader, StreamReader, StreamWriter},
13    execution_types::{
14        self, Accumulator, BlockTuple, CompressedBody, CompressedHeader, CompressedReceipts,
15        TotalDifficulty, MAX_BLOCKS_PER_ERA1,
16    },
17};
18use alloy_primitives::BlockNumber;
19use std::{
20    collections::VecDeque,
21    fs::File,
22    io::{Read, Seek, Write},
23};
24
25/// Era1 file interface
26#[derive(Debug)]
27pub struct Era1File {
28    /// Version record, must be the first record in the file
29    pub version: Version,
30
31    /// Main content group of the Era1 file
32    pub group: Era1Group,
33
34    /// File identifier
35    pub id: Era1Id,
36}
37
38impl EraFileFormat for Era1File {
39    type EraGroup = Era1Group;
40    type Id = Era1Id;
41
42    /// Create a new [`Era1File`]
43    fn new(group: Era1Group, id: Era1Id) -> Self {
44        Self { version: Version, group, id }
45    }
46
47    fn version(&self) -> &Version {
48        &self.version
49    }
50
51    fn group(&self) -> &Self::EraGroup {
52        &self.group
53    }
54
55    fn id(&self) -> &Self::Id {
56        &self.id
57    }
58}
59
60impl Era1File {
61    /// Get a block by its number, if present in this file
62    pub fn get_block_by_number(&self, number: BlockNumber) -> Option<&BlockTuple> {
63        let index = (number - self.group.block_index.starting_number()) as usize;
64        (index < self.group.blocks.len()).then(|| &self.group.blocks[index])
65    }
66
67    /// Get the range of block numbers contained in this file
68    pub fn block_range(&self) -> std::ops::RangeInclusive<BlockNumber> {
69        let start = self.group.block_index.starting_number();
70        let end = start + (self.group.blocks.len() as u64) - 1;
71        start..=end
72    }
73
74    /// Check if this file contains a specific block number
75    pub fn contains_block(&self, number: BlockNumber) -> bool {
76        self.block_range().contains(&number)
77    }
78}
79
80/// Reader for Era1 files that builds on top of [`E2StoreReader`]
81#[derive(Debug)]
82pub struct Era1Reader<R: Read> {
83    reader: E2StoreReader<R>,
84}
85
86/// An iterator of [`BlockTuple`] streaming from [`E2StoreReader`].
87#[derive(Debug)]
88pub struct BlockTupleIterator<R: Read> {
89    reader: E2StoreReader<R>,
90    headers: VecDeque<CompressedHeader>,
91    bodies: VecDeque<CompressedBody>,
92    receipts: VecDeque<CompressedReceipts>,
93    difficulties: VecDeque<TotalDifficulty>,
94    other_entries: Vec<Entry>,
95    accumulator: Option<Accumulator>,
96    block_index: Option<BlockIndex>,
97}
98
99impl<R: Read> BlockTupleIterator<R> {
100    fn new(reader: E2StoreReader<R>) -> Self {
101        Self {
102            reader,
103            headers: Default::default(),
104            bodies: Default::default(),
105            receipts: Default::default(),
106            difficulties: Default::default(),
107            other_entries: Default::default(),
108            accumulator: None,
109            block_index: None,
110        }
111    }
112}
113
114impl<R: Read + Seek> Iterator for BlockTupleIterator<R> {
115    type Item = Result<BlockTuple, E2sError>;
116
117    fn next(&mut self) -> Option<Self::Item> {
118        self.next_result().transpose()
119    }
120}
121
122impl<R: Read + Seek> BlockTupleIterator<R> {
123    fn next_result(&mut self) -> Result<Option<BlockTuple>, E2sError> {
124        loop {
125            let Some(entry) = self.reader.read_next_entry()? else {
126                return Ok(None);
127            };
128
129            match entry.entry_type {
130                execution_types::COMPRESSED_HEADER => {
131                    self.headers.push_back(CompressedHeader::from_entry(&entry)?);
132                }
133                execution_types::COMPRESSED_BODY => {
134                    self.bodies.push_back(CompressedBody::from_entry(&entry)?);
135                }
136                execution_types::COMPRESSED_RECEIPTS => {
137                    self.receipts.push_back(CompressedReceipts::from_entry(&entry)?);
138                }
139                execution_types::TOTAL_DIFFICULTY => {
140                    self.difficulties.push_back(TotalDifficulty::from_entry(&entry)?);
141                }
142                execution_types::ACCUMULATOR => {
143                    if self.accumulator.is_some() {
144                        return Err(E2sError::Ssz("Multiple accumulator entries found".to_string()));
145                    }
146                    self.accumulator = Some(Accumulator::from_entry(&entry)?);
147                }
148                BLOCK_INDEX => {
149                    if self.block_index.is_some() {
150                        return Err(E2sError::Ssz("Multiple block index entries found".to_string()));
151                    }
152                    self.block_index = Some(BlockIndex::from_entry(&entry)?);
153                }
154                _ => {
155                    self.other_entries.push(entry);
156                }
157            }
158
159            if !self.headers.is_empty() &&
160                !self.bodies.is_empty() &&
161                !self.receipts.is_empty() &&
162                !self.difficulties.is_empty()
163            {
164                let header = self.headers.pop_front().unwrap();
165                let body = self.bodies.pop_front().unwrap();
166                let receipt = self.receipts.pop_front().unwrap();
167                let difficulty = self.difficulties.pop_front().unwrap();
168
169                return Ok(Some(BlockTuple::new(header, body, receipt, difficulty)));
170            }
171        }
172    }
173}
174
175impl<R: Read + Seek> StreamReader<R> for Era1Reader<R> {
176    type File = Era1File;
177    type Iterator = BlockTupleIterator<R>;
178
179    /// Create a new [`Era1Reader`]
180    fn new(reader: R) -> Self {
181        Self { reader: E2StoreReader::new(reader) }
182    }
183
184    /// Returns an iterator of [`BlockTuple`] streaming from `reader`.
185    fn iter(self) -> BlockTupleIterator<R> {
186        BlockTupleIterator::new(self.reader)
187    }
188
189    fn read(self, network_name: String) -> Result<Self::File, E2sError> {
190        self.read_and_assemble(network_name)
191    }
192}
193
194impl<R: Read + Seek> Era1Reader<R> {
195    /// Reads and parses an Era1 file from the underlying reader, assembling all components
196    /// into a complete [`Era1File`] with an [`Era1Id`] that includes the provided network name.
197    pub fn read_and_assemble(mut self, network_name: String) -> Result<Era1File, E2sError> {
198        // Validate version entry
199        let _version_entry = match self.reader.read_version()? {
200            Some(entry) if entry.is_version() => entry,
201            Some(_) => return Err(E2sError::Ssz("First entry is not a Version entry".to_string())),
202            None => return Err(E2sError::Ssz("Empty Era1 file".to_string())),
203        };
204
205        let mut iter = self.iter();
206        let blocks = (&mut iter).collect::<Result<Vec<_>, _>>()?;
207
208        let BlockTupleIterator {
209            headers,
210            bodies,
211            receipts,
212            difficulties,
213            other_entries,
214            accumulator,
215            block_index,
216            ..
217        } = iter;
218
219        // Ensure we have matching counts for block components
220        if headers.len() != bodies.len() ||
221            headers.len() != receipts.len() ||
222            headers.len() != difficulties.len()
223        {
224            return Err(E2sError::Ssz(format!(
225                "Mismatched block component counts: headers={}, bodies={}, receipts={}, difficulties={}",
226                headers.len(), bodies.len(), receipts.len(), difficulties.len()
227            )));
228        }
229
230        let accumulator = accumulator
231            .ok_or_else(|| E2sError::Ssz("Era1 file missing accumulator entry".to_string()))?;
232
233        let block_index = block_index
234            .ok_or_else(|| E2sError::Ssz("Era1 file missing block index entry".to_string()))?;
235
236        let mut group = Era1Group::new(blocks, accumulator, block_index.clone());
237
238        // Add other entries
239        for entry in other_entries {
240            group.add_entry(entry);
241        }
242
243        let id = Era1Id::new(
244            network_name,
245            block_index.starting_number(),
246            block_index.offsets().len() as u32,
247        );
248
249        Ok(Era1File::new(group, id))
250    }
251}
252
253impl FileReader for Era1Reader<File> {}
254
255/// Writer for Era1 files that builds on top of [`E2StoreWriter`]
256#[derive(Debug)]
257pub struct Era1Writer<W: Write> {
258    writer: E2StoreWriter<W>,
259    has_written_version: bool,
260    has_written_blocks: bool,
261    has_written_accumulator: bool,
262    has_written_block_index: bool,
263}
264
265impl<W: Write> StreamWriter<W> for Era1Writer<W> {
266    type File = Era1File;
267
268    /// Create a new [`Era1Writer`]
269    fn new(writer: W) -> Self {
270        Self {
271            writer: E2StoreWriter::new(writer),
272            has_written_version: false,
273            has_written_blocks: false,
274            has_written_accumulator: false,
275            has_written_block_index: false,
276        }
277    }
278
279    /// Write the version entry
280    fn write_version(&mut self) -> Result<(), E2sError> {
281        if self.has_written_version {
282            return Ok(());
283        }
284
285        self.writer.write_version()?;
286        self.has_written_version = true;
287        Ok(())
288    }
289
290    /// Write a complete [`Era1File`] to the underlying writer
291    fn write_file(&mut self, era1_file: &Era1File) -> Result<(), E2sError> {
292        // Write version
293        self.write_version()?;
294
295        // Ensure blocks are written before other entries
296        if era1_file.group.blocks.len() > MAX_BLOCKS_PER_ERA1 {
297            return Err(E2sError::Ssz("Era1 file cannot contain more than 8192 blocks".to_string()));
298        }
299
300        // Write all blocks
301        for block in &era1_file.group.blocks {
302            self.write_block(block)?;
303        }
304
305        // Write other entries
306        for entry in &era1_file.group.other_entries {
307            self.writer.write_entry(entry)?;
308        }
309
310        // Write accumulator
311        self.write_accumulator(&era1_file.group.accumulator)?;
312
313        // Write block index
314        self.write_block_index(&era1_file.group.block_index)?;
315
316        // Flush the writer
317        self.writer.flush()?;
318
319        Ok(())
320    }
321
322    /// Flush any buffered data to the underlying writer
323    fn flush(&mut self) -> Result<(), E2sError> {
324        self.writer.flush()
325    }
326}
327
328impl<W: Write> Era1Writer<W> {
329    /// Write a single block tuple
330    pub fn write_block(
331        &mut self,
332        block_tuple: &crate::execution_types::BlockTuple,
333    ) -> Result<(), E2sError> {
334        if !self.has_written_version {
335            self.write_version()?;
336        }
337
338        if self.has_written_accumulator || self.has_written_block_index {
339            return Err(E2sError::Ssz(
340                "Cannot write blocks after accumulator or block index".to_string(),
341            ));
342        }
343
344        // Write header
345        let header_entry = block_tuple.header.to_entry();
346        self.writer.write_entry(&header_entry)?;
347
348        // Write body
349        let body_entry = block_tuple.body.to_entry();
350        self.writer.write_entry(&body_entry)?;
351
352        // Write receipts
353        let receipts_entry = block_tuple.receipts.to_entry();
354        self.writer.write_entry(&receipts_entry)?;
355
356        // Write difficulty
357        let difficulty_entry = block_tuple.total_difficulty.to_entry();
358        self.writer.write_entry(&difficulty_entry)?;
359
360        self.has_written_blocks = true;
361
362        Ok(())
363    }
364
365    /// Write the block index
366    pub fn write_block_index(&mut self, block_index: &BlockIndex) -> Result<(), E2sError> {
367        if !self.has_written_version {
368            self.write_version()?;
369        }
370
371        if self.has_written_block_index {
372            return Err(E2sError::Ssz("Block index already written".to_string()));
373        }
374
375        let block_index_entry = block_index.to_entry();
376        self.writer.write_entry(&block_index_entry)?;
377        self.has_written_block_index = true;
378
379        Ok(())
380    }
381
382    /// Write the accumulator
383    pub fn write_accumulator(&mut self, accumulator: &Accumulator) -> Result<(), E2sError> {
384        if !self.has_written_version {
385            self.write_version()?;
386        }
387
388        if self.has_written_accumulator {
389            return Err(E2sError::Ssz("Accumulator already written".to_string()));
390        }
391
392        if self.has_written_block_index {
393            return Err(E2sError::Ssz("Cannot write accumulator after block index".to_string()));
394        }
395
396        let accumulator_entry = accumulator.to_entry();
397        self.writer.write_entry(&accumulator_entry)?;
398        self.has_written_accumulator = true;
399        Ok(())
400    }
401}
402
403#[cfg(test)]
404mod tests {
405    use super::*;
406    use crate::{
407        era_file_ops::FileWriter,
408        execution_types::{
409            Accumulator, BlockTuple, CompressedBody, CompressedHeader, CompressedReceipts,
410            TotalDifficulty,
411        },
412    };
413    use alloy_primitives::{B256, U256};
414    use std::io::Cursor;
415    use tempfile::tempdir;
416
417    // Helper to create a sample block tuple for testing
418    fn create_test_block(number: BlockNumber, data_size: usize) -> BlockTuple {
419        let header_data = vec![(number % 256) as u8; data_size];
420        let header = CompressedHeader::new(header_data);
421
422        let body_data = vec![((number + 1) % 256) as u8; data_size * 2];
423        let body = CompressedBody::new(body_data);
424
425        let receipts_data = vec![((number + 2) % 256) as u8; data_size];
426        let receipts = CompressedReceipts::new(receipts_data);
427
428        let difficulty = TotalDifficulty::new(U256::from(number * 1000));
429
430        BlockTuple::new(header, body, receipts, difficulty)
431    }
432
433    // Helper to create a sample Era1File for testing
434    fn create_test_era1_file(
435        start_block: BlockNumber,
436        block_count: usize,
437        network: &str,
438    ) -> Era1File {
439        // Create blocks
440        let mut blocks = Vec::with_capacity(block_count);
441        for i in 0..block_count {
442            let block_num = start_block + i as u64;
443            blocks.push(create_test_block(block_num, 32));
444        }
445
446        let accumulator = Accumulator::new(B256::from([0xAA; 32]));
447
448        let mut offsets = Vec::with_capacity(block_count);
449        for i in 0..block_count {
450            offsets.push(i as i64 * 100);
451        }
452        let block_index = BlockIndex::new(start_block, offsets);
453        let group = Era1Group::new(blocks, accumulator, block_index);
454        let id = Era1Id::new(network, start_block, block_count as u32);
455
456        Era1File::new(group, id)
457    }
458
459    #[test]
460    fn test_era1_roundtrip_memory() -> Result<(), E2sError> {
461        // Create a test Era1File
462        let start_block = 1000;
463        let era1_file = create_test_era1_file(1000, 5, "testnet");
464
465        // Write to memory buffer
466        let mut buffer = Vec::new();
467        {
468            let mut writer = Era1Writer::new(&mut buffer);
469            writer.write_file(&era1_file)?;
470        }
471
472        // Read back from memory buffer
473        let reader = Era1Reader::new(Cursor::new(&buffer));
474        let read_era1 = reader.read("testnet".to_string())?;
475
476        // Verify core properties
477        assert_eq!(read_era1.id.network_name, "testnet");
478        assert_eq!(read_era1.id.start_block, 1000);
479        assert_eq!(read_era1.id.block_count, 5);
480        assert_eq!(read_era1.group.blocks.len(), 5);
481
482        // Verify block properties
483        assert_eq!(read_era1.group.blocks[0].total_difficulty.value, U256::from(1000 * 1000));
484        assert_eq!(read_era1.group.blocks[1].total_difficulty.value, U256::from(1001 * 1000));
485
486        // Verify block data
487        assert_eq!(read_era1.group.blocks[0].header.data, vec![(start_block % 256) as u8; 32]);
488        assert_eq!(read_era1.group.blocks[0].body.data, vec![((start_block + 1) % 256) as u8; 64]);
489        assert_eq!(
490            read_era1.group.blocks[0].receipts.data,
491            vec![((start_block + 2) % 256) as u8; 32]
492        );
493
494        // Verify block access methods
495        assert!(read_era1.contains_block(1000));
496        assert!(read_era1.contains_block(1004));
497        assert!(!read_era1.contains_block(999));
498        assert!(!read_era1.contains_block(1005));
499
500        let block_1002 = read_era1.get_block_by_number(1002);
501        assert!(block_1002.is_some());
502        assert_eq!(block_1002.unwrap().header.data, vec![((start_block + 2) % 256) as u8; 32]);
503
504        Ok(())
505    }
506
507    #[test]
508    fn test_era1_roundtrip_file() -> Result<(), E2sError> {
509        // Create a temporary directory
510        let temp_dir = tempdir().expect("Failed to create temp directory");
511        let file_path = temp_dir.path().join("test_roundtrip.era1");
512
513        // Create and write `Era1File` to disk
514        let era1_file = create_test_era1_file(2000, 3, "mainnet");
515        Era1Writer::create(&file_path, &era1_file)?;
516
517        // Read it back
518        let read_era1 = Era1Reader::open(&file_path, "mainnet")?;
519
520        // Verify core properties
521        assert_eq!(read_era1.id.network_name, "mainnet");
522        assert_eq!(read_era1.id.start_block, 2000);
523        assert_eq!(read_era1.id.block_count, 3);
524        assert_eq!(read_era1.group.blocks.len(), 3);
525
526        // Verify blocks
527        for i in 0..3 {
528            let block_num = 2000 + i as u64;
529            let block = read_era1.get_block_by_number(block_num);
530            assert!(block.is_some());
531            assert_eq!(block.unwrap().header.data, vec![block_num as u8; 32]);
532        }
533
534        Ok(())
535    }
536}