1use crate::{
10 common::file_ops::{EraFileFormat, StreamReader, StreamWriter},
11 e2s::{
12 error::E2sError,
13 file::{E2StoreReader, E2StoreWriter},
14 types::{Entry, Version},
15 },
16 ere::types::{
17 execution::{
18 Accumulator, BlockTuple, CompressedBody, CompressedHeader, CompressedSlimReceipts,
19 Proof, TotalDifficulty, ACCUMULATOR, COMPRESSED_BODY, COMPRESSED_HEADER,
20 COMPRESSED_SLIM_RECEIPTS, MAX_BLOCKS_PER_ERE, PROOF, TOTAL_DIFFICULTY,
21 },
22 group::{DynamicBlockIndex, EreGroup, EreId, DYNAMIC_BLOCK_INDEX},
23 },
24};
25use std::{
26 collections::VecDeque,
27 io::{Read, Seek, Write},
28};
29
30#[derive(Debug)]
32pub struct EreFile {
33 pub version: Version,
35
36 pub group: EreGroup,
38
39 pub id: EreId,
41}
42
43impl EraFileFormat for EreFile {
44 type EraGroup = EreGroup;
45 type Id = EreId;
46
47 fn new(group: EreGroup, id: EreId) -> Self {
49 Self { version: Version, group, id }
50 }
51
52 fn version(&self) -> &Version {
53 &self.version
54 }
55
56 fn group(&self) -> &Self::EraGroup {
57 &self.group
58 }
59
60 fn id(&self) -> &Self::Id {
61 &self.id
62 }
63}
64
65#[derive(Debug)]
67pub struct EreReader<R: Read> {
68 reader: E2StoreReader<R>,
69}
70
71#[derive(Debug)]
77pub struct EreBlockTupleIterator<R: Read> {
78 reader: E2StoreReader<R>,
79 blocks: VecDeque<BlockTuple>,
80 accumulator: Option<Accumulator>,
81 index: Option<DynamicBlockIndex>,
82 other_entries: Vec<Entry>,
83 loaded: bool,
84}
85
86impl<R: Read> EreBlockTupleIterator<R> {
87 const fn new(reader: E2StoreReader<R>) -> Self {
88 Self {
89 reader,
90 blocks: VecDeque::new(),
91 accumulator: None,
92 index: None,
93 other_entries: Vec::new(),
94 loaded: false,
95 }
96 }
97}
98
99impl<R: Read + Seek> EreBlockTupleIterator<R> {
100 fn load(&mut self) -> Result<(), E2sError> {
102 if self.loaded {
103 return Ok(());
104 }
105 self.loaded = true;
107
108 let mut headers = Vec::new();
109 let mut bodies = Vec::new();
110 let mut receipts = Vec::new();
111 let mut proofs = Vec::new();
112 let mut difficulties = Vec::new();
113
114 while let Some(entry) = self.reader.read_next_entry()? {
115 match entry.entry_type {
116 COMPRESSED_HEADER => headers.push(CompressedHeader::from_entry(&entry)?),
117 COMPRESSED_BODY => bodies.push(CompressedBody::from_entry(&entry)?),
118 COMPRESSED_SLIM_RECEIPTS => {
119 receipts.push(CompressedSlimReceipts::from_entry(&entry)?)
120 }
121 PROOF => proofs.push(Proof::from_entry(&entry)?),
122 TOTAL_DIFFICULTY => difficulties.push(TotalDifficulty::from_entry(&entry)?),
123 ACCUMULATOR => {
124 if self.accumulator.is_some() {
125 return Err(E2sError::Ssz("Multiple accumulator entries found".to_string()));
126 }
127 self.accumulator = Some(Accumulator::from_entry(&entry)?);
128 }
129 DYNAMIC_BLOCK_INDEX => {
130 if self.index.is_some() {
131 return Err(E2sError::Ssz("Multiple block index entries found".to_string()));
132 }
133 self.index = Some(DynamicBlockIndex::from_entry(&entry)?);
134 }
135 _ => self.other_entries.push(entry),
136 }
137 }
138
139 self.blocks = assemble_blocks(headers, bodies, receipts, proofs, difficulties)?;
140 Ok(())
141 }
142}
143
144impl<R: Read + Seek> Iterator for EreBlockTupleIterator<R> {
145 type Item = Result<BlockTuple, E2sError>;
146
147 fn next(&mut self) -> Option<Self::Item> {
148 if !self.loaded &&
149 let Err(err) = self.load()
150 {
151 return Some(Err(err));
152 }
153 self.blocks.pop_front().map(Ok)
154 }
155}
156
157impl<R: Read + Seek> StreamReader<R> for EreReader<R> {
158 type File = EreFile;
159 type Iterator = EreBlockTupleIterator<R>;
160
161 fn new(reader: R) -> Self {
163 Self { reader: E2StoreReader::new(reader) }
164 }
165
166 fn iter(self) -> EreBlockTupleIterator<R> {
171 EreBlockTupleIterator::new(self.reader)
172 }
173
174 fn read(self, network_name: String) -> Result<Self::File, E2sError> {
175 self.read_and_assemble(network_name)
176 }
177}
178
179impl<R: Read + Seek> EreReader<R> {
180 pub fn read_and_assemble(mut self, network_name: String) -> Result<EreFile, E2sError> {
183 match self.reader.read_version()? {
185 Some(entry) if entry.is_version() => {}
186 Some(_) => return Err(E2sError::Ssz("First entry is not a Version entry".to_string())),
187 None => return Err(E2sError::Ssz("Empty ere file".to_string())),
188 }
189
190 let mut iter = self.iter();
191 let blocks = (&mut iter).collect::<Result<Vec<_>, _>>()?;
192
193 let EreBlockTupleIterator { accumulator, index, other_entries, .. } = iter;
194
195 let index =
196 index.ok_or_else(|| E2sError::Ssz("ere file missing block index entry".to_string()))?;
197
198 validate_blocks_and_index(&blocks, &index)?;
199
200 let id = EreId::new(network_name, index.starting_number(), index.block_count() as u32);
201
202 let mut group = EreGroup::new(blocks, accumulator, index);
203 for entry in other_entries {
204 group.add_entry(entry);
205 }
206
207 Ok(EreFile::new(group, id))
208 }
209}
210
211#[derive(Debug)]
213pub struct EreWriter<W: Write> {
214 writer: E2StoreWriter<W>,
215}
216
217impl<W: Write> StreamWriter<W> for EreWriter<W> {
218 type File = EreFile;
219
220 fn new(writer: W) -> Self {
222 Self { writer: E2StoreWriter::new(writer) }
223 }
224
225 fn write_version(&mut self) -> Result<(), E2sError> {
227 self.writer.write_version()
228 }
229
230 fn write_file(&mut self, file: &EreFile) -> Result<(), E2sError> {
236 self.write_version()?;
237
238 let blocks = &file.group.blocks;
239
240 validate_blocks_and_index(blocks, &file.group.index)?;
244
245 for block in blocks {
246 self.writer.write_entry(&block.header.to_entry())?;
247 }
248 for block in blocks {
249 self.writer.write_entry(&block.body.to_entry())?;
250 }
251 for block in blocks {
252 if let Some(receipts) = &block.receipts {
253 self.writer.write_entry(&receipts.to_entry())?;
254 }
255 }
256 for block in blocks {
257 if let Some(proof) = &block.proof {
258 self.writer.write_entry(&proof.to_entry())?;
259 }
260 }
261 for block in blocks {
262 if let Some(total_difficulty) = &block.total_difficulty {
263 self.writer.write_entry(&total_difficulty.to_entry())?;
264 }
265 }
266
267 for entry in &file.group.other_entries {
268 self.writer.write_entry(entry)?;
269 }
270
271 if let Some(accumulator) = &file.group.accumulator {
272 self.writer.write_entry(&accumulator.to_entry())?;
273 }
274
275 self.writer.write_entry(&file.group.index.to_entry())?;
276
277 self.writer.flush()?;
278 Ok(())
279 }
280
281 fn flush(&mut self) -> Result<(), E2sError> {
283 self.writer.flush()
284 }
285}
286
287fn validate_blocks_and_index(
298 blocks: &[BlockTuple],
299 index: &DynamicBlockIndex,
300) -> Result<(), E2sError> {
301 if blocks.len() > MAX_BLOCKS_PER_ERE {
302 return Err(E2sError::Ssz(format!(
303 "ere file cannot contain more than {MAX_BLOCKS_PER_ERE} blocks, got {}",
304 blocks.len()
305 )));
306 }
307
308 if index.block_count() != blocks.len() {
309 return Err(E2sError::Ssz(format!(
310 "ere index covers {} blocks but the file has {}",
311 index.block_count(),
312 blocks.len()
313 )));
314 }
315
316 let Some(first) = blocks.first() else { return Ok(()) };
317 let (has_receipts, has_difficulty, has_proof) =
318 (first.receipts.is_some(), first.total_difficulty.is_some(), first.proof.is_some());
319
320 for block in blocks {
321 if block.receipts.is_some() != has_receipts ||
322 block.total_difficulty.is_some() != has_difficulty ||
323 block.proof.is_some() != has_proof
324 {
325 return Err(E2sError::Ssz(
326 "ere blocks must share the same optional components (each component present for \
327 all blocks or none)"
328 .to_string(),
329 ));
330 }
331 }
332
333 if index.component_count() != first.component_count() {
334 return Err(E2sError::Ssz(format!(
335 "ere index component-count {} does not match block component-count {}",
336 index.component_count(),
337 first.component_count()
338 )));
339 }
340
341 Ok(())
342}
343
344fn assemble_blocks(
350 headers: Vec<CompressedHeader>,
351 bodies: Vec<CompressedBody>,
352 receipts: Vec<CompressedSlimReceipts>,
353 proofs: Vec<Proof>,
354 difficulties: Vec<TotalDifficulty>,
355) -> Result<VecDeque<BlockTuple>, E2sError> {
356 let block_count = headers.len();
357 if bodies.len() != block_count {
358 return Err(E2sError::Ssz(format!(
359 "Mismatched header/body counts: headers={block_count}, bodies={}",
360 bodies.len()
361 )));
362 }
363
364 let mut receipts = optional_section(receipts, block_count)?;
365 let mut difficulties = optional_section(difficulties, block_count)?;
366 let mut proofs = optional_section(proofs, block_count)?;
367
368 let mut blocks = VecDeque::with_capacity(block_count);
369 for (header, body) in headers.into_iter().zip(bodies) {
370 let mut block = BlockTuple::new(header, body);
371 if let Some(receipts) = receipts.next().flatten() {
372 block = block.with_receipts(receipts);
373 }
374 if let Some(difficulty) = difficulties.next().flatten() {
375 block = block.with_total_difficulty(difficulty);
376 }
377 if let Some(proof) = proofs.next().flatten() {
378 block = block.with_proof(proof);
379 }
380 blocks.push_back(block);
381 }
382
383 Ok(blocks)
384}
385
386fn optional_section<T>(
392 section: Vec<T>,
393 block_count: usize,
394) -> Result<impl Iterator<Item = Option<T>>, E2sError> {
395 let slots: Vec<Option<T>> = match section.len() {
396 0 => (0..block_count).map(|_| None).collect(),
397 n if n == block_count => section.into_iter().map(Some).collect(),
398 n => {
399 return Err(E2sError::Ssz(format!(
400 "ere optional section length must be 0 or {block_count}, got {n}"
401 )))
402 }
403 };
404 Ok(slots.into_iter())
405}
406
407#[cfg(test)]
408mod tests {
409 use super::*;
410 use alloy_primitives::{B256, U256};
411 use std::io::Cursor;
412
413 fn create_test_block(
416 number: u64,
417 data_size: usize,
418 with_receipts: bool,
419 with_difficulty: bool,
420 with_proof: bool,
421 ) -> BlockTuple {
422 let header = CompressedHeader::new(vec![(number % 256) as u8; data_size]);
423 let body = CompressedBody::new(vec![((number + 1) % 256) as u8; data_size * 2]);
424
425 let mut block = BlockTuple::new(header, body);
426 if with_receipts {
427 block = block.with_receipts(CompressedSlimReceipts::new(vec![
428 ((number + 2) % 256)
429 as u8;
430 data_size
431 ]));
432 }
433 if with_difficulty {
434 block = block.with_total_difficulty(TotalDifficulty::new(U256::from(number * 1000)));
435 }
436 if with_proof {
437 block = block.with_proof(Proof::new(vec![((number + 3) % 256) as u8; data_size]));
438 }
439 block
440 }
441
442 fn create_test_ere_file(
445 start_block: u64,
446 block_count: usize,
447 network: &str,
448 with_receipts: bool,
449 with_difficulty: bool,
450 with_proof: bool,
451 with_accumulator: bool,
452 ) -> EreFile {
453 let mut blocks = Vec::with_capacity(block_count);
454 for i in 0..block_count {
455 blocks.push(create_test_block(
456 start_block + i as u64,
457 32,
458 with_receipts,
459 with_difficulty,
460 with_proof,
461 ));
462 }
463
464 let component_count = blocks.first().map_or(2, BlockTuple::component_count);
465 let offsets = (0..(block_count as u64 * component_count) as i64).collect();
468 let index = DynamicBlockIndex::new(start_block, component_count, offsets);
469
470 let accumulator = with_accumulator.then(|| Accumulator::new(B256::from([0xAA; 32])));
471 let group = EreGroup::new(blocks, accumulator, index);
472 let id = EreId::new(network, start_block, block_count as u32);
473
474 EreFile::new(group, id)
475 }
476
477 fn assert_block_eq(actual: &BlockTuple, expected: &BlockTuple) {
479 assert_eq!(actual.header.data, expected.header.data);
480 assert_eq!(actual.body.data, expected.body.data);
481 assert_eq!(
482 actual.receipts.as_ref().map(|r| &r.data),
483 expected.receipts.as_ref().map(|r| &r.data)
484 );
485 assert_eq!(
486 actual.total_difficulty.as_ref().map(|d| d.value),
487 expected.total_difficulty.as_ref().map(|d| d.value)
488 );
489 assert_eq!(
490 actual.proof.as_ref().map(|p| &p.data),
491 expected.proof.as_ref().map(|p| &p.data)
492 );
493 }
494
495 fn write_then_read(original: &EreFile, network: &str) -> EreFile {
497 let mut buffer = Vec::new();
498 EreWriter::new(&mut buffer).write_file(original).expect("write");
499 EreReader::new(Cursor::new(&buffer)).read(network.to_string()).expect("read")
500 }
501
502 #[test]
503 fn test_ere_write_read_all_components() {
504 let original = create_test_ere_file(1000, 4, "testnet", true, true, true, true);
506 let read = write_then_read(&original, "testnet");
507
508 assert_eq!(read.id.network_name, "testnet");
509 assert_eq!(read.id.start_block, 1000);
510 assert_eq!(read.id.block_count, 4);
511 assert_eq!(read.group.blocks.len(), 4);
512 assert_eq!(read.group.index, original.group.index);
513 assert_eq!(read.group.accumulator, original.group.accumulator);
514
515 for (read_block, original_block) in read.group.blocks.iter().zip(&original.group.blocks) {
516 assert_eq!(read_block.component_count(), 5);
517 assert_block_eq(read_block, original_block);
518 }
519 }
520
521 #[test]
522 fn test_ere_write_read_header_body_only() {
523 let original = create_test_ere_file(2000, 3, "mainnet", false, false, false, false);
525 let read = write_then_read(&original, "mainnet");
526
527 assert_eq!(read.group.blocks.len(), 3);
528 assert!(read.group.accumulator.is_none());
529 assert_eq!(read.group.index, original.group.index);
530
531 for (read_block, original_block) in read.group.blocks.iter().zip(&original.group.blocks) {
532 assert_eq!(read_block.component_count(), 2);
533 assert!(read_block.receipts.is_none());
534 assert!(read_block.total_difficulty.is_none());
535 assert!(read_block.proof.is_none());
536 assert_block_eq(read_block, original_block);
537 }
538 }
539
540 #[test]
541 fn test_ere_write_read_partial_components() {
542 let original = create_test_ere_file(0, 2, "sepolia", true, true, false, true);
544 let read = write_then_read(&original, "sepolia");
545
546 for block in &read.group.blocks {
547 assert_eq!(block.component_count(), 4);
548 assert!(block.receipts.is_some());
549 assert!(block.total_difficulty.is_some());
550 assert!(block.proof.is_none());
551 }
552 }
553
554 #[test]
555 fn test_ere_write_read_preserves_other_entries() {
556 let mut original = create_test_ere_file(1000, 1, "testnet", true, true, true, true);
557 original.group.add_entry(Entry::new([0x42, 0x42], vec![1, 2, 3, 4]));
558
559 let read = write_then_read(&original, "testnet");
560
561 assert_eq!(read.group.other_entries.len(), 1);
562 assert_eq!(read.group.other_entries[0].entry_type, [0x42, 0x42]);
563 assert_eq!(read.group.other_entries[0].data, vec![1, 2, 3, 4]);
564 }
565
566 #[test]
567 fn test_ere_read_rejects_missing_index() {
568 let mut buffer = Vec::new();
570 {
571 let mut writer = E2StoreWriter::new(&mut buffer);
572 writer.write_version().unwrap();
573 let block = create_test_block(1, 8, false, false, false);
574 writer.write_entry(&block.header.to_entry()).unwrap();
575 writer.write_entry(&block.body.to_entry()).unwrap();
576 writer.flush().unwrap();
577 }
578
579 let err = EreReader::new(Cursor::new(&buffer)).read("testnet".to_string());
580 assert!(err.is_err());
581 }
582
583 #[test]
584 fn test_ere_read_rejects_partial_optional_section() {
585 let mut buffer = Vec::new();
588 {
589 let mut writer = E2StoreWriter::new(&mut buffer);
590 writer.write_version().unwrap();
591 let b0 = create_test_block(0, 8, false, false, false);
592 let b1 = create_test_block(1, 8, false, false, false);
593 writer.write_entry(&b0.header.to_entry()).unwrap();
594 writer.write_entry(&b1.header.to_entry()).unwrap();
595 writer.write_entry(&b0.body.to_entry()).unwrap();
596 writer.write_entry(&b1.body.to_entry()).unwrap();
597 writer.write_entry(&CompressedSlimReceipts::new(vec![0xCC; 8]).to_entry()).unwrap();
599 writer.write_entry(&DynamicBlockIndex::new(0, 2, vec![0, 1, 2, 3]).to_entry()).unwrap();
600 writer.flush().unwrap();
601 }
602
603 let err = EreReader::new(Cursor::new(&buffer)).read("testnet".to_string());
604 assert!(err.is_err());
605 }
606
607 #[test]
608 fn test_ere_write_read_empty() {
609 let original = create_test_ere_file(0, 0, "testnet", false, false, false, false);
611 let read = write_then_read(&original, "testnet");
612
613 assert_eq!(read.group.blocks.len(), 0);
614 assert_eq!(read.id.block_count, 0);
615 assert_eq!(read.group.index, original.group.index);
616 }
617
618 #[test]
619 fn test_ere_read_rejects_empty_input() {
620 let err = EreReader::new(Cursor::new(Vec::new())).read("testnet".to_string());
622 assert!(err.is_err());
623 }
624
625 #[test]
626 fn test_ere_read_rejects_non_version_first_entry() {
627 let mut buffer = Vec::new();
629 Entry::new([0x99, 0x99], vec![1, 2, 3]).write(&mut buffer).unwrap();
630
631 let err = EreReader::new(Cursor::new(&buffer)).read("testnet".to_string());
632 assert!(err.is_err());
633 }
634
635 #[test]
636 fn test_ere_read_rejects_header_body_mismatch() {
637 let mut buffer = Vec::new();
639 {
640 let mut writer = E2StoreWriter::new(&mut buffer);
641 writer.write_version().unwrap();
642 let b0 = create_test_block(0, 8, false, false, false);
643 let b1 = create_test_block(1, 8, false, false, false);
644 writer.write_entry(&b0.header.to_entry()).unwrap();
645 writer.write_entry(&b1.header.to_entry()).unwrap();
646 writer.write_entry(&b0.body.to_entry()).unwrap();
647 writer.write_entry(&DynamicBlockIndex::new(0, 2, vec![0, 1, 2, 3]).to_entry()).unwrap();
648 writer.flush().unwrap();
649 }
650
651 let err = EreReader::new(Cursor::new(&buffer)).read("testnet".to_string());
652 assert!(err.is_err());
653 }
654
655 #[test]
656 fn test_ere_read_rejects_duplicate_index() {
657 let mut buffer = Vec::new();
659 {
660 let mut writer = E2StoreWriter::new(&mut buffer);
661 writer.write_version().unwrap();
662 writer.write_entry(&DynamicBlockIndex::new(0, 2, Vec::new()).to_entry()).unwrap();
663 writer.write_entry(&DynamicBlockIndex::new(0, 2, Vec::new()).to_entry()).unwrap();
664 writer.flush().unwrap();
665 }
666
667 let err = EreReader::new(Cursor::new(&buffer)).read("testnet".to_string());
668 assert!(err.is_err());
669 }
670
671 #[test]
672 fn test_ere_write_rejects_non_uniform_components() {
673 let blocks = vec![
676 create_test_block(0, 8, true, false, false),
677 create_test_block(1, 8, false, false, false),
678 ];
679 let index = DynamicBlockIndex::new(0, 3, vec![0, 1, 2, 3, 4, 5]);
680 let file = EreFile::new(EreGroup::new(blocks, None, index), EreId::new("testnet", 0, 2));
681
682 let err = EreWriter::new(&mut Vec::new()).write_file(&file);
683 assert!(err.is_err());
684 }
685
686 #[test]
687 fn test_ere_write_rejects_too_many_blocks() {
688 let file =
690 create_test_ere_file(0, MAX_BLOCKS_PER_ERE + 1, "testnet", false, false, false, false);
691
692 let err = EreWriter::new(&mut Vec::new()).write_file(&file);
693 assert!(err.is_err());
694 }
695
696 #[test]
697 fn test_ere_write_rejects_index_block_count_mismatch() {
698 let mut file = create_test_ere_file(0, 2, "testnet", false, false, false, false);
700 file.group.index = DynamicBlockIndex::new(0, 2, vec![0, 1]);
701
702 let err = EreWriter::new(&mut Vec::new()).write_file(&file);
703 assert!(err.is_err());
704 }
705
706 #[test]
707 fn test_ere_read_rejects_index_block_count_mismatch() {
708 let mut buffer = Vec::new();
710 {
711 let mut writer = E2StoreWriter::new(&mut buffer);
712 writer.write_version().unwrap();
713 let b0 = create_test_block(0, 8, false, false, false);
714 let b1 = create_test_block(1, 8, false, false, false);
715 writer.write_entry(&b0.header.to_entry()).unwrap();
716 writer.write_entry(&b1.header.to_entry()).unwrap();
717 writer.write_entry(&b0.body.to_entry()).unwrap();
718 writer.write_entry(&b1.body.to_entry()).unwrap();
719 writer.write_entry(&DynamicBlockIndex::new(0, 2, vec![0, 1]).to_entry()).unwrap();
720 writer.flush().unwrap();
721 }
722
723 let err = EreReader::new(Cursor::new(&buffer)).read("testnet".to_string());
724 assert!(err.is_err());
725 }
726
727 #[test]
728 fn test_ere_write_version_then_write_file_writes_single_version() {
729 let original = create_test_ere_file(0, 1, "testnet", false, false, false, false);
732
733 let mut buffer = Vec::new();
734 {
735 let mut writer = EreWriter::new(&mut buffer);
736 writer.write_version().unwrap();
737 writer.write_file(&original).unwrap();
738 }
739
740 let entries = E2StoreReader::new(Cursor::new(&buffer)).entries().unwrap();
741 assert_eq!(entries.iter().filter(|entry| entry.is_version()).count(), 1);
742 }
743
744 #[test]
745 fn test_ere_read_rejects_duplicate_accumulator() {
746 let mut buffer = Vec::new();
748 {
749 let mut writer = E2StoreWriter::new(&mut buffer);
750 writer.write_version().unwrap();
751 writer.write_entry(&Accumulator::new(B256::from([0x11; 32])).to_entry()).unwrap();
752 writer.write_entry(&Accumulator::new(B256::from([0x22; 32])).to_entry()).unwrap();
753 writer.write_entry(&DynamicBlockIndex::new(0, 2, Vec::new()).to_entry()).unwrap();
754 writer.flush().unwrap();
755 }
756
757 let err = EreReader::new(Cursor::new(&buffer)).read("testnet".to_string());
758 assert!(err.is_err());
759 }
760}