reth_era/era/
file.rs

1//! Represents a complete Era file
2//!
3//! The structure of an Era file follows the specification:
4//! `Version | block* | era-state | other-entries* | slot-index(block)? | slot-index(state)`
5//!
6//! See also <https://github.com/eth-clients/e2store-format-specs/blob/main/formats/era.md>.
7
8use crate::{
9    common::file_ops::{EraFileFormat, FileReader, StreamReader, StreamWriter},
10    e2s::{
11        error::E2sError,
12        file::{E2StoreReader, E2StoreWriter},
13        types::{Entry, IndexEntry, Version, SLOT_INDEX},
14    },
15    era::types::{
16        consensus::{
17            CompressedBeaconState, CompressedSignedBeaconBlock, COMPRESSED_BEACON_STATE,
18            COMPRESSED_SIGNED_BEACON_BLOCK,
19        },
20        group::{EraGroup, EraId, SlotIndex},
21    },
22};
23use std::{
24    fs::File,
25    io::{Read, Seek, Write},
26};
27
28/// Era file interface
29#[derive(Debug)]
30pub struct EraFile {
31    /// Version record, must be the first record in the file
32    pub version: Version,
33
34    /// Main content group of the Era file
35    pub group: EraGroup,
36
37    /// File identifier
38    pub id: EraId,
39}
40
41impl EraFileFormat for EraFile {
42    type EraGroup = EraGroup;
43    type Id = EraId;
44
45    /// Create a new [`EraFile`]
46    fn new(group: EraGroup, id: EraId) -> Self {
47        Self { version: Version, group, id }
48    }
49
50    fn version(&self) -> &Version {
51        &self.version
52    }
53
54    fn group(&self) -> &Self::EraGroup {
55        &self.group
56    }
57
58    fn id(&self) -> &Self::Id {
59        &self.id
60    }
61}
62
63/// Reader for era files that builds on top of [`E2StoreReader`]
64#[derive(Debug)]
65pub struct EraReader<R: Read> {
66    reader: E2StoreReader<R>,
67}
68
69/// An iterator of [`BeaconBlockIterator`] streaming from [`E2StoreReader`].
70#[derive(Debug)]
71pub struct BeaconBlockIterator<R: Read> {
72    reader: E2StoreReader<R>,
73    state: Option<CompressedBeaconState>,
74    other_entries: Vec<Entry>,
75    block_slot_index: Option<SlotIndex>,
76    state_slot_index: Option<SlotIndex>,
77}
78
79impl<R: Read> BeaconBlockIterator<R> {
80    fn new(reader: E2StoreReader<R>) -> Self {
81        Self {
82            reader,
83            state: None,
84            other_entries: Default::default(),
85            block_slot_index: None,
86            state_slot_index: None,
87        }
88    }
89}
90
91impl<R: Read + Seek> Iterator for BeaconBlockIterator<R> {
92    type Item = Result<CompressedSignedBeaconBlock, E2sError>;
93
94    fn next(&mut self) -> Option<Self::Item> {
95        self.next_result().transpose()
96    }
97}
98
99impl<R: Read + Seek> BeaconBlockIterator<R> {
100    fn next_result(&mut self) -> Result<Option<CompressedSignedBeaconBlock>, E2sError> {
101        loop {
102            let Some(entry) = self.reader.read_next_entry()? else {
103                return Ok(None);
104            };
105
106            match entry.entry_type {
107                COMPRESSED_SIGNED_BEACON_BLOCK => {
108                    let block = CompressedSignedBeaconBlock::from_entry(&entry)?;
109                    return Ok(Some(block));
110                }
111                COMPRESSED_BEACON_STATE => {
112                    if self.state.is_some() {
113                        return Err(E2sError::Ssz("Multiple state entries found".to_string()));
114                    }
115                    self.state = Some(CompressedBeaconState::from_entry(&entry)?);
116                }
117                SLOT_INDEX => {
118                    let slot_index = SlotIndex::from_entry(&entry)?;
119                    // if we haven't seen the state yet, the slot index is for blocks,
120                    // if we have seen the state, the slot index is for the state
121                    if self.state.is_none() {
122                        self.block_slot_index = Some(slot_index);
123                    } else {
124                        self.state_slot_index = Some(slot_index);
125                    }
126                }
127                _ => {
128                    self.other_entries.push(entry);
129                }
130            }
131        }
132    }
133}
134
135impl<R: Read + Seek> StreamReader<R> for EraReader<R> {
136    type File = EraFile;
137    type Iterator = BeaconBlockIterator<R>;
138
139    /// Create a new [`EraReader`]
140    fn new(reader: R) -> Self {
141        Self { reader: E2StoreReader::new(reader) }
142    }
143
144    /// Returns an iterator of [`BeaconBlockIterator`] streaming from `reader`.
145    fn iter(self) -> BeaconBlockIterator<R> {
146        BeaconBlockIterator::new(self.reader)
147    }
148
149    fn read(self, network_name: String) -> Result<Self::File, E2sError> {
150        self.read_and_assemble(network_name)
151    }
152}
153
154impl<R: Read + Seek> EraReader<R> {
155    /// Reads and parses an era file from the underlying reader, assembling all components
156    /// into a complete [`EraFile`] with an [`EraId`] that includes the provided network name.
157    pub fn read_and_assemble(mut self, network_name: String) -> Result<EraFile, E2sError> {
158        // Validate version entry
159        let _version_entry = match self.reader.read_version()? {
160            Some(entry) if entry.is_version() => entry,
161            Some(_) => return Err(E2sError::Ssz("First entry is not a Version entry".to_string())),
162            None => return Err(E2sError::Ssz("Empty Era file".to_string())),
163        };
164
165        let mut iter = self.iter();
166        let blocks = (&mut iter).collect::<Result<Vec<_>, _>>()?;
167
168        let BeaconBlockIterator {
169            state, other_entries, block_slot_index, state_slot_index, ..
170        } = iter;
171
172        let state =
173            state.ok_or_else(|| E2sError::Ssz("Era file missing state entry".to_string()))?;
174
175        let state_slot_index = state_slot_index
176            .ok_or_else(|| E2sError::Ssz("Era file missing state slot index".to_string()))?;
177
178        // Create appropriate `EraGroup`, genesis vs non-genesis
179        let mut group = if let Some(block_index) = block_slot_index {
180            EraGroup::with_block_index(blocks, state, block_index, state_slot_index)
181        } else {
182            EraGroup::new(blocks, state, state_slot_index)
183        };
184
185        // Add other entries
186        for entry in other_entries {
187            group.add_entry(entry);
188        }
189
190        let (start_slot, slot_count) = group.slot_range();
191
192        let id = EraId::new(network_name, start_slot, slot_count);
193
194        Ok(EraFile::new(group, id))
195    }
196}
197
198impl FileReader for EraReader<File> {}
199
200/// Writer for Era files that builds on top of [`E2StoreWriter`]
201#[derive(Debug)]
202pub struct EraWriter<W: Write> {
203    writer: E2StoreWriter<W>,
204    has_written_version: bool,
205    has_written_state: bool,
206    has_written_block_slot_index: bool,
207    has_written_state_slot_index: bool,
208}
209
210impl<W: Write> StreamWriter<W> for EraWriter<W> {
211    type File = EraFile;
212
213    /// Create a new [`EraWriter`]
214    fn new(writer: W) -> Self {
215        Self {
216            writer: E2StoreWriter::new(writer),
217            has_written_version: false,
218            has_written_state: false,
219            has_written_block_slot_index: false,
220            has_written_state_slot_index: false,
221        }
222    }
223
224    /// Write the version entry
225    fn write_version(&mut self) -> Result<(), E2sError> {
226        if self.has_written_version {
227            return Ok(());
228        }
229
230        self.writer.write_version()?;
231        self.has_written_version = true;
232        Ok(())
233    }
234
235    fn write_file(&mut self, file: &Self::File) -> Result<(), E2sError> {
236        // Write version
237        self.write_version()?;
238
239        // Write all blocks
240        for block in &file.group.blocks {
241            self.write_beacon_block(block)?;
242        }
243
244        // Write state
245        self.write_beacon_state(&file.group.era_state)?;
246
247        // Write other entries
248        for entry in &file.group.other_entries {
249            self.writer.write_entry(entry)?;
250        }
251
252        // Write slot index
253        if let Some(ref block_index) = file.group.slot_index {
254            self.write_block_slot_index(block_index)?;
255        }
256
257        // Write state index
258        self.write_state_slot_index(&file.group.state_slot_index)?;
259
260        self.writer.flush()?;
261        Ok(())
262    }
263
264    /// Flush any buffered data to the underlying writer
265    fn flush(&mut self) -> Result<(), E2sError> {
266        self.writer.flush()
267    }
268}
269
270impl<W: Write> EraWriter<W> {
271    /// Write beacon block
272    pub fn write_beacon_block(
273        &mut self,
274        block: &CompressedSignedBeaconBlock,
275    ) -> Result<(), E2sError> {
276        self.ensure_version_written()?;
277
278        // Ensure blocks are written before state/indices
279        if self.has_written_state ||
280            self.has_written_block_slot_index ||
281            self.has_written_state_slot_index
282        {
283            return Err(E2sError::Ssz("Cannot write blocks after state or indices".to_string()));
284        }
285
286        let entry = block.to_entry();
287        self.writer.write_entry(&entry)?;
288        Ok(())
289    }
290
291    // Write beacon state
292    fn write_beacon_state(&mut self, state: &CompressedBeaconState) -> Result<(), E2sError> {
293        self.ensure_version_written()?;
294
295        if self.has_written_state {
296            return Err(E2sError::Ssz("State already written".to_string()));
297        }
298
299        let entry = state.to_entry();
300        self.writer.write_entry(&entry)?;
301        self.has_written_state = true;
302        Ok(())
303    }
304
305    /// Write the block slot index
306    pub fn write_block_slot_index(&mut self, slot_index: &SlotIndex) -> Result<(), E2sError> {
307        self.ensure_version_written()?;
308
309        if self.has_written_block_slot_index {
310            return Err(E2sError::Ssz("Block slot index already written".to_string()));
311        }
312
313        let entry = slot_index.to_entry();
314        self.writer.write_entry(&entry)?;
315        self.has_written_block_slot_index = true;
316
317        Ok(())
318    }
319
320    /// Write the state slot index
321    pub fn write_state_slot_index(&mut self, slot_index: &SlotIndex) -> Result<(), E2sError> {
322        self.ensure_version_written()?;
323
324        if self.has_written_state_slot_index {
325            return Err(E2sError::Ssz("State slot index already written".to_string()));
326        }
327
328        let entry = slot_index.to_entry();
329        self.writer.write_entry(&entry)?;
330        self.has_written_state_slot_index = true;
331
332        Ok(())
333    }
334
335    /// Helper to ensure version is written before any data
336    fn ensure_version_written(&mut self) -> Result<(), E2sError> {
337        if !self.has_written_version {
338            self.write_version()?;
339        }
340        Ok(())
341    }
342}