Skip to main content

reth_era/era/
file.rs

1//! Represents a complete Era file
2//!
3//! The structure of an Era file follows the specification:
4//! `Version | block* | era-state | other-entries* | slot-index(block)? | slot-index(state)`
5//!
6//! See also <https://github.com/eth-clients/e2store-format-specs/blob/main/formats/era.md>.
7
8use crate::{
9    common::file_ops::{EraFileFormat, StreamReader, StreamWriter},
10    e2s::{
11        error::E2sError,
12        file::{E2StoreReader, E2StoreWriter},
13        types::{Entry, IndexEntry, Version, SLOT_INDEX},
14    },
15    era::types::{
16        consensus::{
17            CompressedBeaconState, CompressedSignedBeaconBlock, COMPRESSED_BEACON_STATE,
18            COMPRESSED_SIGNED_BEACON_BLOCK,
19        },
20        group::{EraGroup, EraId, SlotIndex},
21    },
22};
23use std::io::{Read, Seek, Write};
24
25/// Era file interface
26#[derive(Debug)]
27pub struct EraFile {
28    /// Version record, must be the first record in the file
29    pub version: Version,
30
31    /// Main content group of the Era file
32    pub group: EraGroup,
33
34    /// File identifier
35    pub id: EraId,
36}
37
38impl EraFileFormat for EraFile {
39    type EraGroup = EraGroup;
40    type Id = EraId;
41
42    /// Create a new [`EraFile`]
43    fn new(group: EraGroup, id: EraId) -> Self {
44        Self { version: Version, group, id }
45    }
46
47    fn version(&self) -> &Version {
48        &self.version
49    }
50
51    fn group(&self) -> &Self::EraGroup {
52        &self.group
53    }
54
55    fn id(&self) -> &Self::Id {
56        &self.id
57    }
58}
59
60/// Reader for era files that builds on top of [`E2StoreReader`]
61#[derive(Debug)]
62pub struct EraReader<R: Read> {
63    reader: E2StoreReader<R>,
64}
65
66/// An iterator of [`BeaconBlockIterator`] streaming from [`E2StoreReader`].
67#[derive(Debug)]
68pub struct BeaconBlockIterator<R: Read> {
69    reader: E2StoreReader<R>,
70    state: Option<CompressedBeaconState>,
71    other_entries: Vec<Entry>,
72    block_slot_index: Option<SlotIndex>,
73    state_slot_index: Option<SlotIndex>,
74}
75
76impl<R: Read> BeaconBlockIterator<R> {
77    fn new(reader: E2StoreReader<R>) -> Self {
78        Self {
79            reader,
80            state: None,
81            other_entries: Default::default(),
82            block_slot_index: None,
83            state_slot_index: None,
84        }
85    }
86}
87
88impl<R: Read + Seek> Iterator for BeaconBlockIterator<R> {
89    type Item = Result<CompressedSignedBeaconBlock, E2sError>;
90
91    fn next(&mut self) -> Option<Self::Item> {
92        self.next_result().transpose()
93    }
94}
95
96impl<R: Read + Seek> BeaconBlockIterator<R> {
97    fn next_result(&mut self) -> Result<Option<CompressedSignedBeaconBlock>, E2sError> {
98        loop {
99            let Some(entry) = self.reader.read_next_entry()? else {
100                return Ok(None);
101            };
102
103            match entry.entry_type {
104                COMPRESSED_SIGNED_BEACON_BLOCK => {
105                    let block = CompressedSignedBeaconBlock::from_entry(&entry)?;
106                    return Ok(Some(block));
107                }
108                COMPRESSED_BEACON_STATE => {
109                    if self.state.is_some() {
110                        return Err(E2sError::Ssz("Multiple state entries found".to_string()));
111                    }
112                    self.state = Some(CompressedBeaconState::from_entry(&entry)?);
113                }
114                SLOT_INDEX => {
115                    let slot_index = SlotIndex::from_entry(&entry)?;
116                    // if we haven't seen the state yet, the slot index is for blocks,
117                    // if we have seen the state, the slot index is for the state
118                    if self.state.is_none() {
119                        self.block_slot_index = Some(slot_index);
120                    } else {
121                        self.state_slot_index = Some(slot_index);
122                    }
123                }
124                _ => {
125                    self.other_entries.push(entry);
126                }
127            }
128        }
129    }
130}
131
132impl<R: Read + Seek> StreamReader<R> for EraReader<R> {
133    type File = EraFile;
134    type Iterator = BeaconBlockIterator<R>;
135
136    /// Create a new [`EraReader`]
137    fn new(reader: R) -> Self {
138        Self { reader: E2StoreReader::new(reader) }
139    }
140
141    /// Returns an iterator of [`BeaconBlockIterator`] streaming from `reader`.
142    fn iter(self) -> BeaconBlockIterator<R> {
143        BeaconBlockIterator::new(self.reader)
144    }
145
146    fn read(self, network_name: String) -> Result<Self::File, E2sError> {
147        self.read_and_assemble(network_name)
148    }
149}
150
151impl<R: Read + Seek> EraReader<R> {
152    /// Reads and parses an era file from the underlying reader, assembling all components
153    /// into a complete [`EraFile`] with an [`EraId`] that includes the provided network name.
154    pub fn read_and_assemble(mut self, network_name: String) -> Result<EraFile, E2sError> {
155        // Validate version entry
156        let _version_entry = match self.reader.read_version()? {
157            Some(entry) if entry.is_version() => entry,
158            Some(_) => return Err(E2sError::Ssz("First entry is not a Version entry".to_string())),
159            None => return Err(E2sError::Ssz("Empty Era file".to_string())),
160        };
161
162        let mut iter = self.iter();
163        let blocks = (&mut iter).collect::<Result<Vec<_>, _>>()?;
164
165        let BeaconBlockIterator {
166            state, other_entries, block_slot_index, state_slot_index, ..
167        } = iter;
168
169        let state =
170            state.ok_or_else(|| E2sError::Ssz("Era file missing state entry".to_string()))?;
171
172        let state_slot_index = state_slot_index
173            .ok_or_else(|| E2sError::Ssz("Era file missing state slot index".to_string()))?;
174
175        // Create appropriate `EraGroup`, genesis vs non-genesis
176        let mut group = if let Some(block_index) = block_slot_index {
177            EraGroup::with_block_index(blocks, state, block_index, state_slot_index)
178        } else {
179            EraGroup::new(blocks, state, state_slot_index)
180        };
181
182        // Add other entries
183        for entry in other_entries {
184            group.add_entry(entry);
185        }
186
187        let (start_slot, slot_count) = group.slot_range();
188
189        let id = EraId::new(network_name, start_slot, slot_count);
190
191        Ok(EraFile::new(group, id))
192    }
193}
194
195/// Writer for Era files that builds on top of [`E2StoreWriter`]
196#[derive(Debug)]
197pub struct EraWriter<W: Write> {
198    writer: E2StoreWriter<W>,
199    has_written_version: bool,
200    has_written_state: bool,
201    has_written_block_slot_index: bool,
202    has_written_state_slot_index: bool,
203}
204
205impl<W: Write> StreamWriter<W> for EraWriter<W> {
206    type File = EraFile;
207
208    /// Create a new [`EraWriter`]
209    fn new(writer: W) -> Self {
210        Self {
211            writer: E2StoreWriter::new(writer),
212            has_written_version: false,
213            has_written_state: false,
214            has_written_block_slot_index: false,
215            has_written_state_slot_index: false,
216        }
217    }
218
219    /// Write the version entry
220    fn write_version(&mut self) -> Result<(), E2sError> {
221        if self.has_written_version {
222            return Ok(());
223        }
224
225        self.writer.write_version()?;
226        self.has_written_version = true;
227        Ok(())
228    }
229
230    fn write_file(&mut self, file: &Self::File) -> Result<(), E2sError> {
231        // Write version
232        self.write_version()?;
233
234        // Write all blocks
235        for block in &file.group.blocks {
236            self.write_beacon_block(block)?;
237        }
238
239        // Write state
240        self.write_beacon_state(&file.group.era_state)?;
241
242        // Write other entries
243        for entry in &file.group.other_entries {
244            self.writer.write_entry(entry)?;
245        }
246
247        // Write slot index
248        if let Some(ref block_index) = file.group.slot_index {
249            self.write_block_slot_index(block_index)?;
250        }
251
252        // Write state index
253        self.write_state_slot_index(&file.group.state_slot_index)?;
254
255        self.writer.flush()?;
256        Ok(())
257    }
258
259    /// Flush any buffered data to the underlying writer
260    fn flush(&mut self) -> Result<(), E2sError> {
261        self.writer.flush()
262    }
263}
264
265impl<W: Write> EraWriter<W> {
266    /// Write beacon block
267    pub fn write_beacon_block(
268        &mut self,
269        block: &CompressedSignedBeaconBlock,
270    ) -> Result<(), E2sError> {
271        self.ensure_version_written()?;
272
273        // Ensure blocks are written before state/indices
274        if self.has_written_state ||
275            self.has_written_block_slot_index ||
276            self.has_written_state_slot_index
277        {
278            return Err(E2sError::Ssz("Cannot write blocks after state or indices".to_string()));
279        }
280
281        let entry = block.to_entry();
282        self.writer.write_entry(&entry)?;
283        Ok(())
284    }
285
286    // Write beacon state
287    fn write_beacon_state(&mut self, state: &CompressedBeaconState) -> Result<(), E2sError> {
288        self.ensure_version_written()?;
289
290        if self.has_written_state {
291            return Err(E2sError::Ssz("State already written".to_string()));
292        }
293
294        let entry = state.to_entry();
295        self.writer.write_entry(&entry)?;
296        self.has_written_state = true;
297        Ok(())
298    }
299
300    /// Write the block slot index
301    pub fn write_block_slot_index(&mut self, slot_index: &SlotIndex) -> Result<(), E2sError> {
302        self.ensure_version_written()?;
303
304        if self.has_written_block_slot_index {
305            return Err(E2sError::Ssz("Block slot index already written".to_string()));
306        }
307
308        let entry = slot_index.to_entry();
309        self.writer.write_entry(&entry)?;
310        self.has_written_block_slot_index = true;
311
312        Ok(())
313    }
314
315    /// Write the state slot index
316    pub fn write_state_slot_index(&mut self, slot_index: &SlotIndex) -> Result<(), E2sError> {
317        self.ensure_version_written()?;
318
319        if self.has_written_state_slot_index {
320            return Err(E2sError::Ssz("State slot index already written".to_string()));
321        }
322
323        let entry = slot_index.to_entry();
324        self.writer.write_entry(&entry)?;
325        self.has_written_state_slot_index = true;
326
327        Ok(())
328    }
329
330    /// Helper to ensure version is written before any data
331    fn ensure_version_written(&mut self) -> Result<(), E2sError> {
332        if !self.has_written_version {
333            self.write_version()?;
334        }
335        Ok(())
336    }
337}