1use crate::{
9 common::file_ops::{EraFileFormat, StreamReader, StreamWriter},
10 e2s::{
11 error::E2sError,
12 file::{E2StoreReader, E2StoreWriter},
13 types::{Entry, IndexEntry, Version, SLOT_INDEX},
14 },
15 era::types::{
16 consensus::{
17 CompressedBeaconState, CompressedSignedBeaconBlock, COMPRESSED_BEACON_STATE,
18 COMPRESSED_SIGNED_BEACON_BLOCK,
19 },
20 group::{EraGroup, EraId, SlotIndex},
21 },
22};
23use std::io::{Read, Seek, Write};
24
25#[derive(Debug)]
27pub struct EraFile {
28 pub version: Version,
30
31 pub group: EraGroup,
33
34 pub id: EraId,
36}
37
38impl EraFileFormat for EraFile {
39 type EraGroup = EraGroup;
40 type Id = EraId;
41
42 fn new(group: EraGroup, id: EraId) -> Self {
44 Self { version: Version, group, id }
45 }
46
47 fn version(&self) -> &Version {
48 &self.version
49 }
50
51 fn group(&self) -> &Self::EraGroup {
52 &self.group
53 }
54
55 fn id(&self) -> &Self::Id {
56 &self.id
57 }
58}
59
60#[derive(Debug)]
62pub struct EraReader<R: Read> {
63 reader: E2StoreReader<R>,
64}
65
66#[derive(Debug)]
68pub struct BeaconBlockIterator<R: Read> {
69 reader: E2StoreReader<R>,
70 state: Option<CompressedBeaconState>,
71 other_entries: Vec<Entry>,
72 block_slot_index: Option<SlotIndex>,
73 state_slot_index: Option<SlotIndex>,
74}
75
76impl<R: Read> BeaconBlockIterator<R> {
77 fn new(reader: E2StoreReader<R>) -> Self {
78 Self {
79 reader,
80 state: None,
81 other_entries: Default::default(),
82 block_slot_index: None,
83 state_slot_index: None,
84 }
85 }
86}
87
88impl<R: Read + Seek> Iterator for BeaconBlockIterator<R> {
89 type Item = Result<CompressedSignedBeaconBlock, E2sError>;
90
91 fn next(&mut self) -> Option<Self::Item> {
92 self.next_result().transpose()
93 }
94}
95
96impl<R: Read + Seek> BeaconBlockIterator<R> {
97 fn next_result(&mut self) -> Result<Option<CompressedSignedBeaconBlock>, E2sError> {
98 loop {
99 let Some(entry) = self.reader.read_next_entry()? else {
100 return Ok(None);
101 };
102
103 match entry.entry_type {
104 COMPRESSED_SIGNED_BEACON_BLOCK => {
105 let block = CompressedSignedBeaconBlock::from_entry(&entry)?;
106 return Ok(Some(block));
107 }
108 COMPRESSED_BEACON_STATE => {
109 if self.state.is_some() {
110 return Err(E2sError::Ssz("Multiple state entries found".to_string()));
111 }
112 self.state = Some(CompressedBeaconState::from_entry(&entry)?);
113 }
114 SLOT_INDEX => {
115 let slot_index = SlotIndex::from_entry(&entry)?;
116 if self.state.is_none() {
119 self.block_slot_index = Some(slot_index);
120 } else {
121 self.state_slot_index = Some(slot_index);
122 }
123 }
124 _ => {
125 self.other_entries.push(entry);
126 }
127 }
128 }
129 }
130}
131
132impl<R: Read + Seek> StreamReader<R> for EraReader<R> {
133 type File = EraFile;
134 type Iterator = BeaconBlockIterator<R>;
135
136 fn new(reader: R) -> Self {
138 Self { reader: E2StoreReader::new(reader) }
139 }
140
141 fn iter(self) -> BeaconBlockIterator<R> {
143 BeaconBlockIterator::new(self.reader)
144 }
145
146 fn read(self, network_name: String) -> Result<Self::File, E2sError> {
147 self.read_and_assemble(network_name)
148 }
149}
150
151impl<R: Read + Seek> EraReader<R> {
152 pub fn read_and_assemble(mut self, network_name: String) -> Result<EraFile, E2sError> {
155 let _version_entry = match self.reader.read_version()? {
157 Some(entry) if entry.is_version() => entry,
158 Some(_) => return Err(E2sError::Ssz("First entry is not a Version entry".to_string())),
159 None => return Err(E2sError::Ssz("Empty Era file".to_string())),
160 };
161
162 let mut iter = self.iter();
163 let blocks = (&mut iter).collect::<Result<Vec<_>, _>>()?;
164
165 let BeaconBlockIterator {
166 state, other_entries, block_slot_index, state_slot_index, ..
167 } = iter;
168
169 let state =
170 state.ok_or_else(|| E2sError::Ssz("Era file missing state entry".to_string()))?;
171
172 let state_slot_index = state_slot_index
173 .ok_or_else(|| E2sError::Ssz("Era file missing state slot index".to_string()))?;
174
175 let mut group = if let Some(block_index) = block_slot_index {
177 EraGroup::with_block_index(blocks, state, block_index, state_slot_index)
178 } else {
179 EraGroup::new(blocks, state, state_slot_index)
180 };
181
182 for entry in other_entries {
184 group.add_entry(entry);
185 }
186
187 let (start_slot, slot_count) = group.slot_range();
188
189 let id = EraId::new(network_name, start_slot, slot_count);
190
191 Ok(EraFile::new(group, id))
192 }
193}
194
195#[derive(Debug)]
197pub struct EraWriter<W: Write> {
198 writer: E2StoreWriter<W>,
199 has_written_version: bool,
200 has_written_state: bool,
201 has_written_block_slot_index: bool,
202 has_written_state_slot_index: bool,
203}
204
205impl<W: Write> StreamWriter<W> for EraWriter<W> {
206 type File = EraFile;
207
208 fn new(writer: W) -> Self {
210 Self {
211 writer: E2StoreWriter::new(writer),
212 has_written_version: false,
213 has_written_state: false,
214 has_written_block_slot_index: false,
215 has_written_state_slot_index: false,
216 }
217 }
218
219 fn write_version(&mut self) -> Result<(), E2sError> {
221 if self.has_written_version {
222 return Ok(());
223 }
224
225 self.writer.write_version()?;
226 self.has_written_version = true;
227 Ok(())
228 }
229
230 fn write_file(&mut self, file: &Self::File) -> Result<(), E2sError> {
231 self.write_version()?;
233
234 for block in &file.group.blocks {
236 self.write_beacon_block(block)?;
237 }
238
239 self.write_beacon_state(&file.group.era_state)?;
241
242 for entry in &file.group.other_entries {
244 self.writer.write_entry(entry)?;
245 }
246
247 if let Some(ref block_index) = file.group.slot_index {
249 self.write_block_slot_index(block_index)?;
250 }
251
252 self.write_state_slot_index(&file.group.state_slot_index)?;
254
255 self.writer.flush()?;
256 Ok(())
257 }
258
259 fn flush(&mut self) -> Result<(), E2sError> {
261 self.writer.flush()
262 }
263}
264
265impl<W: Write> EraWriter<W> {
266 pub fn write_beacon_block(
268 &mut self,
269 block: &CompressedSignedBeaconBlock,
270 ) -> Result<(), E2sError> {
271 self.ensure_version_written()?;
272
273 if self.has_written_state ||
275 self.has_written_block_slot_index ||
276 self.has_written_state_slot_index
277 {
278 return Err(E2sError::Ssz("Cannot write blocks after state or indices".to_string()));
279 }
280
281 let entry = block.to_entry();
282 self.writer.write_entry(&entry)?;
283 Ok(())
284 }
285
286 fn write_beacon_state(&mut self, state: &CompressedBeaconState) -> Result<(), E2sError> {
288 self.ensure_version_written()?;
289
290 if self.has_written_state {
291 return Err(E2sError::Ssz("State already written".to_string()));
292 }
293
294 let entry = state.to_entry();
295 self.writer.write_entry(&entry)?;
296 self.has_written_state = true;
297 Ok(())
298 }
299
300 pub fn write_block_slot_index(&mut self, slot_index: &SlotIndex) -> Result<(), E2sError> {
302 self.ensure_version_written()?;
303
304 if self.has_written_block_slot_index {
305 return Err(E2sError::Ssz("Block slot index already written".to_string()));
306 }
307
308 let entry = slot_index.to_entry();
309 self.writer.write_entry(&entry)?;
310 self.has_written_block_slot_index = true;
311
312 Ok(())
313 }
314
315 pub fn write_state_slot_index(&mut self, slot_index: &SlotIndex) -> Result<(), E2sError> {
317 self.ensure_version_written()?;
318
319 if self.has_written_state_slot_index {
320 return Err(E2sError::Ssz("State slot index already written".to_string()));
321 }
322
323 let entry = slot_index.to_entry();
324 self.writer.write_entry(&entry)?;
325 self.has_written_state_slot_index = true;
326
327 Ok(())
328 }
329
330 fn ensure_version_written(&mut self) -> Result<(), E2sError> {
332 if !self.has_written_version {
333 self.write_version()?;
334 }
335 Ok(())
336 }
337}