1use crate::{
9 e2s_file::{E2StoreReader, E2StoreWriter},
10 e2s_types::{E2sError, Entry, IndexEntry, Version},
11 era1_types::{BlockIndex, Era1Group, Era1Id, BLOCK_INDEX},
12 era_file_ops::{EraFileFormat, FileReader, StreamReader, StreamWriter},
13 execution_types::{
14 self, Accumulator, BlockTuple, CompressedBody, CompressedHeader, CompressedReceipts,
15 TotalDifficulty, MAX_BLOCKS_PER_ERA1,
16 },
17};
18use alloy_primitives::BlockNumber;
19use std::{
20 collections::VecDeque,
21 fs::File,
22 io::{Read, Seek, Write},
23};
24
25#[derive(Debug)]
27pub struct Era1File {
28 pub version: Version,
30
31 pub group: Era1Group,
33
34 pub id: Era1Id,
36}
37
38impl EraFileFormat for Era1File {
39 type EraGroup = Era1Group;
40 type Id = Era1Id;
41
42 fn new(group: Era1Group, id: Era1Id) -> Self {
44 Self { version: Version, group, id }
45 }
46
47 fn version(&self) -> &Version {
48 &self.version
49 }
50
51 fn group(&self) -> &Self::EraGroup {
52 &self.group
53 }
54
55 fn id(&self) -> &Self::Id {
56 &self.id
57 }
58}
59
60impl Era1File {
61 pub fn get_block_by_number(&self, number: BlockNumber) -> Option<&BlockTuple> {
63 let index = (number - self.group.block_index.starting_number()) as usize;
64 (index < self.group.blocks.len()).then(|| &self.group.blocks[index])
65 }
66
67 pub fn block_range(&self) -> std::ops::RangeInclusive<BlockNumber> {
69 let start = self.group.block_index.starting_number();
70 let end = start + (self.group.blocks.len() as u64) - 1;
71 start..=end
72 }
73
74 pub fn contains_block(&self, number: BlockNumber) -> bool {
76 self.block_range().contains(&number)
77 }
78}
79
80#[derive(Debug)]
82pub struct Era1Reader<R: Read> {
83 reader: E2StoreReader<R>,
84}
85
86#[derive(Debug)]
88pub struct BlockTupleIterator<R: Read> {
89 reader: E2StoreReader<R>,
90 headers: VecDeque<CompressedHeader>,
91 bodies: VecDeque<CompressedBody>,
92 receipts: VecDeque<CompressedReceipts>,
93 difficulties: VecDeque<TotalDifficulty>,
94 other_entries: Vec<Entry>,
95 accumulator: Option<Accumulator>,
96 block_index: Option<BlockIndex>,
97}
98
99impl<R: Read> BlockTupleIterator<R> {
100 fn new(reader: E2StoreReader<R>) -> Self {
101 Self {
102 reader,
103 headers: Default::default(),
104 bodies: Default::default(),
105 receipts: Default::default(),
106 difficulties: Default::default(),
107 other_entries: Default::default(),
108 accumulator: None,
109 block_index: None,
110 }
111 }
112}
113
114impl<R: Read + Seek> Iterator for BlockTupleIterator<R> {
115 type Item = Result<BlockTuple, E2sError>;
116
117 fn next(&mut self) -> Option<Self::Item> {
118 self.next_result().transpose()
119 }
120}
121
122impl<R: Read + Seek> BlockTupleIterator<R> {
123 fn next_result(&mut self) -> Result<Option<BlockTuple>, E2sError> {
124 loop {
125 let Some(entry) = self.reader.read_next_entry()? else {
126 return Ok(None);
127 };
128
129 match entry.entry_type {
130 execution_types::COMPRESSED_HEADER => {
131 self.headers.push_back(CompressedHeader::from_entry(&entry)?);
132 }
133 execution_types::COMPRESSED_BODY => {
134 self.bodies.push_back(CompressedBody::from_entry(&entry)?);
135 }
136 execution_types::COMPRESSED_RECEIPTS => {
137 self.receipts.push_back(CompressedReceipts::from_entry(&entry)?);
138 }
139 execution_types::TOTAL_DIFFICULTY => {
140 self.difficulties.push_back(TotalDifficulty::from_entry(&entry)?);
141 }
142 execution_types::ACCUMULATOR => {
143 if self.accumulator.is_some() {
144 return Err(E2sError::Ssz("Multiple accumulator entries found".to_string()));
145 }
146 self.accumulator = Some(Accumulator::from_entry(&entry)?);
147 }
148 BLOCK_INDEX => {
149 if self.block_index.is_some() {
150 return Err(E2sError::Ssz("Multiple block index entries found".to_string()));
151 }
152 self.block_index = Some(BlockIndex::from_entry(&entry)?);
153 }
154 _ => {
155 self.other_entries.push(entry);
156 }
157 }
158
159 if !self.headers.is_empty() &&
160 !self.bodies.is_empty() &&
161 !self.receipts.is_empty() &&
162 !self.difficulties.is_empty()
163 {
164 let header = self.headers.pop_front().unwrap();
165 let body = self.bodies.pop_front().unwrap();
166 let receipt = self.receipts.pop_front().unwrap();
167 let difficulty = self.difficulties.pop_front().unwrap();
168
169 return Ok(Some(BlockTuple::new(header, body, receipt, difficulty)));
170 }
171 }
172 }
173}
174
175impl<R: Read + Seek> StreamReader<R> for Era1Reader<R> {
176 type File = Era1File;
177 type Iterator = BlockTupleIterator<R>;
178
179 fn new(reader: R) -> Self {
181 Self { reader: E2StoreReader::new(reader) }
182 }
183
184 fn iter(self) -> BlockTupleIterator<R> {
186 BlockTupleIterator::new(self.reader)
187 }
188
189 fn read(self, network_name: String) -> Result<Self::File, E2sError> {
190 self.read_and_assemble(network_name)
191 }
192}
193
194impl<R: Read + Seek> Era1Reader<R> {
195 pub fn read_and_assemble(mut self, network_name: String) -> Result<Era1File, E2sError> {
198 let _version_entry = match self.reader.read_version()? {
200 Some(entry) if entry.is_version() => entry,
201 Some(_) => return Err(E2sError::Ssz("First entry is not a Version entry".to_string())),
202 None => return Err(E2sError::Ssz("Empty Era1 file".to_string())),
203 };
204
205 let mut iter = self.iter();
206 let blocks = (&mut iter).collect::<Result<Vec<_>, _>>()?;
207
208 let BlockTupleIterator {
209 headers,
210 bodies,
211 receipts,
212 difficulties,
213 other_entries,
214 accumulator,
215 block_index,
216 ..
217 } = iter;
218
219 if headers.len() != bodies.len() ||
221 headers.len() != receipts.len() ||
222 headers.len() != difficulties.len()
223 {
224 return Err(E2sError::Ssz(format!(
225 "Mismatched block component counts: headers={}, bodies={}, receipts={}, difficulties={}",
226 headers.len(), bodies.len(), receipts.len(), difficulties.len()
227 )));
228 }
229
230 let accumulator = accumulator
231 .ok_or_else(|| E2sError::Ssz("Era1 file missing accumulator entry".to_string()))?;
232
233 let block_index = block_index
234 .ok_or_else(|| E2sError::Ssz("Era1 file missing block index entry".to_string()))?;
235
236 let mut group = Era1Group::new(blocks, accumulator, block_index.clone());
237
238 for entry in other_entries {
240 group.add_entry(entry);
241 }
242
243 let id = Era1Id::new(
244 network_name,
245 block_index.starting_number(),
246 block_index.offsets().len() as u32,
247 );
248
249 Ok(Era1File::new(group, id))
250 }
251}
252
253impl FileReader for Era1Reader<File> {}
254
255#[derive(Debug)]
257pub struct Era1Writer<W: Write> {
258 writer: E2StoreWriter<W>,
259 has_written_version: bool,
260 has_written_blocks: bool,
261 has_written_accumulator: bool,
262 has_written_block_index: bool,
263}
264
265impl<W: Write> StreamWriter<W> for Era1Writer<W> {
266 type File = Era1File;
267
268 fn new(writer: W) -> Self {
270 Self {
271 writer: E2StoreWriter::new(writer),
272 has_written_version: false,
273 has_written_blocks: false,
274 has_written_accumulator: false,
275 has_written_block_index: false,
276 }
277 }
278
279 fn write_version(&mut self) -> Result<(), E2sError> {
281 if self.has_written_version {
282 return Ok(());
283 }
284
285 self.writer.write_version()?;
286 self.has_written_version = true;
287 Ok(())
288 }
289
290 fn write_file(&mut self, era1_file: &Era1File) -> Result<(), E2sError> {
292 self.write_version()?;
294
295 if era1_file.group.blocks.len() > MAX_BLOCKS_PER_ERA1 {
297 return Err(E2sError::Ssz("Era1 file cannot contain more than 8192 blocks".to_string()));
298 }
299
300 for block in &era1_file.group.blocks {
302 self.write_block(block)?;
303 }
304
305 for entry in &era1_file.group.other_entries {
307 self.writer.write_entry(entry)?;
308 }
309
310 self.write_accumulator(&era1_file.group.accumulator)?;
312
313 self.write_block_index(&era1_file.group.block_index)?;
315
316 self.writer.flush()?;
318
319 Ok(())
320 }
321
322 fn flush(&mut self) -> Result<(), E2sError> {
324 self.writer.flush()
325 }
326}
327
328impl<W: Write> Era1Writer<W> {
329 pub fn write_block(
331 &mut self,
332 block_tuple: &crate::execution_types::BlockTuple,
333 ) -> Result<(), E2sError> {
334 if !self.has_written_version {
335 self.write_version()?;
336 }
337
338 if self.has_written_accumulator || self.has_written_block_index {
339 return Err(E2sError::Ssz(
340 "Cannot write blocks after accumulator or block index".to_string(),
341 ));
342 }
343
344 let header_entry = block_tuple.header.to_entry();
346 self.writer.write_entry(&header_entry)?;
347
348 let body_entry = block_tuple.body.to_entry();
350 self.writer.write_entry(&body_entry)?;
351
352 let receipts_entry = block_tuple.receipts.to_entry();
354 self.writer.write_entry(&receipts_entry)?;
355
356 let difficulty_entry = block_tuple.total_difficulty.to_entry();
358 self.writer.write_entry(&difficulty_entry)?;
359
360 self.has_written_blocks = true;
361
362 Ok(())
363 }
364
365 pub fn write_block_index(&mut self, block_index: &BlockIndex) -> Result<(), E2sError> {
367 if !self.has_written_version {
368 self.write_version()?;
369 }
370
371 if self.has_written_block_index {
372 return Err(E2sError::Ssz("Block index already written".to_string()));
373 }
374
375 let block_index_entry = block_index.to_entry();
376 self.writer.write_entry(&block_index_entry)?;
377 self.has_written_block_index = true;
378
379 Ok(())
380 }
381
382 pub fn write_accumulator(&mut self, accumulator: &Accumulator) -> Result<(), E2sError> {
384 if !self.has_written_version {
385 self.write_version()?;
386 }
387
388 if self.has_written_accumulator {
389 return Err(E2sError::Ssz("Accumulator already written".to_string()));
390 }
391
392 if self.has_written_block_index {
393 return Err(E2sError::Ssz("Cannot write accumulator after block index".to_string()));
394 }
395
396 let accumulator_entry = accumulator.to_entry();
397 self.writer.write_entry(&accumulator_entry)?;
398 self.has_written_accumulator = true;
399 Ok(())
400 }
401}
402
403#[cfg(test)]
404mod tests {
405 use super::*;
406 use crate::{
407 era_file_ops::FileWriter,
408 execution_types::{
409 Accumulator, BlockTuple, CompressedBody, CompressedHeader, CompressedReceipts,
410 TotalDifficulty,
411 },
412 };
413 use alloy_primitives::{B256, U256};
414 use std::io::Cursor;
415 use tempfile::tempdir;
416
417 fn create_test_block(number: BlockNumber, data_size: usize) -> BlockTuple {
419 let header_data = vec![(number % 256) as u8; data_size];
420 let header = CompressedHeader::new(header_data);
421
422 let body_data = vec![((number + 1) % 256) as u8; data_size * 2];
423 let body = CompressedBody::new(body_data);
424
425 let receipts_data = vec![((number + 2) % 256) as u8; data_size];
426 let receipts = CompressedReceipts::new(receipts_data);
427
428 let difficulty = TotalDifficulty::new(U256::from(number * 1000));
429
430 BlockTuple::new(header, body, receipts, difficulty)
431 }
432
433 fn create_test_era1_file(
435 start_block: BlockNumber,
436 block_count: usize,
437 network: &str,
438 ) -> Era1File {
439 let mut blocks = Vec::with_capacity(block_count);
441 for i in 0..block_count {
442 let block_num = start_block + i as u64;
443 blocks.push(create_test_block(block_num, 32));
444 }
445
446 let accumulator = Accumulator::new(B256::from([0xAA; 32]));
447
448 let mut offsets = Vec::with_capacity(block_count);
449 for i in 0..block_count {
450 offsets.push(i as i64 * 100);
451 }
452 let block_index = BlockIndex::new(start_block, offsets);
453 let group = Era1Group::new(blocks, accumulator, block_index);
454 let id = Era1Id::new(network, start_block, block_count as u32);
455
456 Era1File::new(group, id)
457 }
458
459 #[test]
460 fn test_era1_roundtrip_memory() -> Result<(), E2sError> {
461 let start_block = 1000;
463 let era1_file = create_test_era1_file(1000, 5, "testnet");
464
465 let mut buffer = Vec::new();
467 {
468 let mut writer = Era1Writer::new(&mut buffer);
469 writer.write_file(&era1_file)?;
470 }
471
472 let reader = Era1Reader::new(Cursor::new(&buffer));
474 let read_era1 = reader.read("testnet".to_string())?;
475
476 assert_eq!(read_era1.id.network_name, "testnet");
478 assert_eq!(read_era1.id.start_block, 1000);
479 assert_eq!(read_era1.id.block_count, 5);
480 assert_eq!(read_era1.group.blocks.len(), 5);
481
482 assert_eq!(read_era1.group.blocks[0].total_difficulty.value, U256::from(1000 * 1000));
484 assert_eq!(read_era1.group.blocks[1].total_difficulty.value, U256::from(1001 * 1000));
485
486 assert_eq!(read_era1.group.blocks[0].header.data, vec![(start_block % 256) as u8; 32]);
488 assert_eq!(read_era1.group.blocks[0].body.data, vec![((start_block + 1) % 256) as u8; 64]);
489 assert_eq!(
490 read_era1.group.blocks[0].receipts.data,
491 vec![((start_block + 2) % 256) as u8; 32]
492 );
493
494 assert!(read_era1.contains_block(1000));
496 assert!(read_era1.contains_block(1004));
497 assert!(!read_era1.contains_block(999));
498 assert!(!read_era1.contains_block(1005));
499
500 let block_1002 = read_era1.get_block_by_number(1002);
501 assert!(block_1002.is_some());
502 assert_eq!(block_1002.unwrap().header.data, vec![((start_block + 2) % 256) as u8; 32]);
503
504 Ok(())
505 }
506
507 #[test]
508 fn test_era1_roundtrip_file() -> Result<(), E2sError> {
509 let temp_dir = tempdir().expect("Failed to create temp directory");
511 let file_path = temp_dir.path().join("test_roundtrip.era1");
512
513 let era1_file = create_test_era1_file(2000, 3, "mainnet");
515 Era1Writer::create(&file_path, &era1_file)?;
516
517 let read_era1 = Era1Reader::open(&file_path, "mainnet")?;
519
520 assert_eq!(read_era1.id.network_name, "mainnet");
522 assert_eq!(read_era1.id.start_block, 2000);
523 assert_eq!(read_era1.id.block_count, 3);
524 assert_eq!(read_era1.group.blocks.len(), 3);
525
526 for i in 0..3 {
528 let block_num = 2000 + i as u64;
529 let block = read_era1.get_block_by_number(block_num);
530 assert!(block.is_some());
531 assert_eq!(block.unwrap().header.data, vec![block_num as u8; 32]);
532 }
533
534 Ok(())
535 }
536}