1use crate::{
9 common::file_ops::{EraFileFormat, FileReader, StreamReader, StreamWriter},
10 e2s::{
11 error::E2sError,
12 file::{E2StoreReader, E2StoreWriter},
13 types::{Entry, IndexEntry, Version, SLOT_INDEX},
14 },
15 era::types::{
16 consensus::{
17 CompressedBeaconState, CompressedSignedBeaconBlock, COMPRESSED_BEACON_STATE,
18 COMPRESSED_SIGNED_BEACON_BLOCK,
19 },
20 group::{EraGroup, EraId, SlotIndex},
21 },
22};
23use std::{
24 fs::File,
25 io::{Read, Seek, Write},
26};
27
28#[derive(Debug)]
30pub struct EraFile {
31 pub version: Version,
33
34 pub group: EraGroup,
36
37 pub id: EraId,
39}
40
41impl EraFileFormat for EraFile {
42 type EraGroup = EraGroup;
43 type Id = EraId;
44
45 fn new(group: EraGroup, id: EraId) -> Self {
47 Self { version: Version, group, id }
48 }
49
50 fn version(&self) -> &Version {
51 &self.version
52 }
53
54 fn group(&self) -> &Self::EraGroup {
55 &self.group
56 }
57
58 fn id(&self) -> &Self::Id {
59 &self.id
60 }
61}
62
63#[derive(Debug)]
65pub struct EraReader<R: Read> {
66 reader: E2StoreReader<R>,
67}
68
69#[derive(Debug)]
71pub struct BeaconBlockIterator<R: Read> {
72 reader: E2StoreReader<R>,
73 state: Option<CompressedBeaconState>,
74 other_entries: Vec<Entry>,
75 block_slot_index: Option<SlotIndex>,
76 state_slot_index: Option<SlotIndex>,
77}
78
79impl<R: Read> BeaconBlockIterator<R> {
80 fn new(reader: E2StoreReader<R>) -> Self {
81 Self {
82 reader,
83 state: None,
84 other_entries: Default::default(),
85 block_slot_index: None,
86 state_slot_index: None,
87 }
88 }
89}
90
91impl<R: Read + Seek> Iterator for BeaconBlockIterator<R> {
92 type Item = Result<CompressedSignedBeaconBlock, E2sError>;
93
94 fn next(&mut self) -> Option<Self::Item> {
95 self.next_result().transpose()
96 }
97}
98
99impl<R: Read + Seek> BeaconBlockIterator<R> {
100 fn next_result(&mut self) -> Result<Option<CompressedSignedBeaconBlock>, E2sError> {
101 loop {
102 let Some(entry) = self.reader.read_next_entry()? else {
103 return Ok(None);
104 };
105
106 match entry.entry_type {
107 COMPRESSED_SIGNED_BEACON_BLOCK => {
108 let block = CompressedSignedBeaconBlock::from_entry(&entry)?;
109 return Ok(Some(block));
110 }
111 COMPRESSED_BEACON_STATE => {
112 if self.state.is_some() {
113 return Err(E2sError::Ssz("Multiple state entries found".to_string()));
114 }
115 self.state = Some(CompressedBeaconState::from_entry(&entry)?);
116 }
117 SLOT_INDEX => {
118 let slot_index = SlotIndex::from_entry(&entry)?;
119 if self.state.is_none() {
122 self.block_slot_index = Some(slot_index);
123 } else {
124 self.state_slot_index = Some(slot_index);
125 }
126 }
127 _ => {
128 self.other_entries.push(entry);
129 }
130 }
131 }
132 }
133}
134
135impl<R: Read + Seek> StreamReader<R> for EraReader<R> {
136 type File = EraFile;
137 type Iterator = BeaconBlockIterator<R>;
138
139 fn new(reader: R) -> Self {
141 Self { reader: E2StoreReader::new(reader) }
142 }
143
144 fn iter(self) -> BeaconBlockIterator<R> {
146 BeaconBlockIterator::new(self.reader)
147 }
148
149 fn read(self, network_name: String) -> Result<Self::File, E2sError> {
150 self.read_and_assemble(network_name)
151 }
152}
153
154impl<R: Read + Seek> EraReader<R> {
155 pub fn read_and_assemble(mut self, network_name: String) -> Result<EraFile, E2sError> {
158 let _version_entry = match self.reader.read_version()? {
160 Some(entry) if entry.is_version() => entry,
161 Some(_) => return Err(E2sError::Ssz("First entry is not a Version entry".to_string())),
162 None => return Err(E2sError::Ssz("Empty Era file".to_string())),
163 };
164
165 let mut iter = self.iter();
166 let blocks = (&mut iter).collect::<Result<Vec<_>, _>>()?;
167
168 let BeaconBlockIterator {
169 state, other_entries, block_slot_index, state_slot_index, ..
170 } = iter;
171
172 let state =
173 state.ok_or_else(|| E2sError::Ssz("Era file missing state entry".to_string()))?;
174
175 let state_slot_index = state_slot_index
176 .ok_or_else(|| E2sError::Ssz("Era file missing state slot index".to_string()))?;
177
178 let mut group = if let Some(block_index) = block_slot_index {
180 EraGroup::with_block_index(blocks, state, block_index, state_slot_index)
181 } else {
182 EraGroup::new(blocks, state, state_slot_index)
183 };
184
185 for entry in other_entries {
187 group.add_entry(entry);
188 }
189
190 let (start_slot, slot_count) = group.slot_range();
191
192 let id = EraId::new(network_name, start_slot, slot_count);
193
194 Ok(EraFile::new(group, id))
195 }
196}
197
198impl FileReader for EraReader<File> {}
199
200#[derive(Debug)]
202pub struct EraWriter<W: Write> {
203 writer: E2StoreWriter<W>,
204 has_written_version: bool,
205 has_written_state: bool,
206 has_written_block_slot_index: bool,
207 has_written_state_slot_index: bool,
208}
209
210impl<W: Write> StreamWriter<W> for EraWriter<W> {
211 type File = EraFile;
212
213 fn new(writer: W) -> Self {
215 Self {
216 writer: E2StoreWriter::new(writer),
217 has_written_version: false,
218 has_written_state: false,
219 has_written_block_slot_index: false,
220 has_written_state_slot_index: false,
221 }
222 }
223
224 fn write_version(&mut self) -> Result<(), E2sError> {
226 if self.has_written_version {
227 return Ok(());
228 }
229
230 self.writer.write_version()?;
231 self.has_written_version = true;
232 Ok(())
233 }
234
235 fn write_file(&mut self, file: &Self::File) -> Result<(), E2sError> {
236 self.write_version()?;
238
239 for block in &file.group.blocks {
241 self.write_beacon_block(block)?;
242 }
243
244 self.write_beacon_state(&file.group.era_state)?;
246
247 for entry in &file.group.other_entries {
249 self.writer.write_entry(entry)?;
250 }
251
252 if let Some(ref block_index) = file.group.slot_index {
254 self.write_block_slot_index(block_index)?;
255 }
256
257 self.write_state_slot_index(&file.group.state_slot_index)?;
259
260 self.writer.flush()?;
261 Ok(())
262 }
263
264 fn flush(&mut self) -> Result<(), E2sError> {
266 self.writer.flush()
267 }
268}
269
270impl<W: Write> EraWriter<W> {
271 pub fn write_beacon_block(
273 &mut self,
274 block: &CompressedSignedBeaconBlock,
275 ) -> Result<(), E2sError> {
276 self.ensure_version_written()?;
277
278 if self.has_written_state ||
280 self.has_written_block_slot_index ||
281 self.has_written_state_slot_index
282 {
283 return Err(E2sError::Ssz("Cannot write blocks after state or indices".to_string()));
284 }
285
286 let entry = block.to_entry();
287 self.writer.write_entry(&entry)?;
288 Ok(())
289 }
290
291 fn write_beacon_state(&mut self, state: &CompressedBeaconState) -> Result<(), E2sError> {
293 self.ensure_version_written()?;
294
295 if self.has_written_state {
296 return Err(E2sError::Ssz("State already written".to_string()));
297 }
298
299 let entry = state.to_entry();
300 self.writer.write_entry(&entry)?;
301 self.has_written_state = true;
302 Ok(())
303 }
304
305 pub fn write_block_slot_index(&mut self, slot_index: &SlotIndex) -> Result<(), E2sError> {
307 self.ensure_version_written()?;
308
309 if self.has_written_block_slot_index {
310 return Err(E2sError::Ssz("Block slot index already written".to_string()));
311 }
312
313 let entry = slot_index.to_entry();
314 self.writer.write_entry(&entry)?;
315 self.has_written_block_slot_index = true;
316
317 Ok(())
318 }
319
320 pub fn write_state_slot_index(&mut self, slot_index: &SlotIndex) -> Result<(), E2sError> {
322 self.ensure_version_written()?;
323
324 if self.has_written_state_slot_index {
325 return Err(E2sError::Ssz("State slot index already written".to_string()));
326 }
327
328 let entry = slot_index.to_entry();
329 self.writer.write_entry(&entry)?;
330 self.has_written_state_slot_index = true;
331
332 Ok(())
333 }
334
335 fn ensure_version_written(&mut self) -> Result<(), E2sError> {
337 if !self.has_written_version {
338 self.write_version()?;
339 }
340 Ok(())
341 }
342}