1use crate::{
9 common::file_ops::{EraFileFormat, StreamReader, StreamWriter},
10 e2s::{
11 error::E2sError,
12 file::{E2StoreReader, E2StoreWriter},
13 types::{Entry, IndexEntry, Version},
14 },
15 era1::types::{
16 execution::{
17 Accumulator, BlockTuple, CompressedBody, CompressedHeader, CompressedReceipts,
18 TotalDifficulty, ACCUMULATOR, COMPRESSED_BODY, COMPRESSED_HEADER, COMPRESSED_RECEIPTS,
19 MAX_BLOCKS_PER_ERA1, TOTAL_DIFFICULTY,
20 },
21 group::{BlockIndex, Era1Group, Era1Id, BLOCK_INDEX},
22 },
23};
24use alloy_primitives::BlockNumber;
25use std::{
26 collections::VecDeque,
27 io::{Read, Seek, Write},
28};
29
30#[derive(Debug)]
32pub struct Era1File {
33 pub version: Version,
35
36 pub group: Era1Group,
38
39 pub id: Era1Id,
41}
42
43impl EraFileFormat for Era1File {
44 type EraGroup = Era1Group;
45 type Id = Era1Id;
46
47 fn new(group: Era1Group, id: Era1Id) -> Self {
49 Self { version: Version, group, id }
50 }
51
52 fn version(&self) -> &Version {
53 &self.version
54 }
55
56 fn group(&self) -> &Self::EraGroup {
57 &self.group
58 }
59
60 fn id(&self) -> &Self::Id {
61 &self.id
62 }
63}
64
65impl Era1File {
66 pub fn get_block_by_number(&self, number: BlockNumber) -> Option<&BlockTuple> {
68 let index = (number - self.group.block_index.starting_number()) as usize;
69 (index < self.group.blocks.len()).then(|| &self.group.blocks[index])
70 }
71
72 pub fn block_range(&self) -> std::ops::RangeInclusive<BlockNumber> {
74 let start = self.group.block_index.starting_number();
75 let end = start + (self.group.blocks.len() as u64) - 1;
76 start..=end
77 }
78
79 pub fn contains_block(&self, number: BlockNumber) -> bool {
81 self.block_range().contains(&number)
82 }
83}
84
85#[derive(Debug)]
87pub struct Era1Reader<R: Read> {
88 reader: E2StoreReader<R>,
89}
90
91#[derive(Debug)]
93pub struct BlockTupleIterator<R: Read> {
94 reader: E2StoreReader<R>,
95 headers: VecDeque<CompressedHeader>,
96 bodies: VecDeque<CompressedBody>,
97 receipts: VecDeque<CompressedReceipts>,
98 difficulties: VecDeque<TotalDifficulty>,
99 other_entries: Vec<Entry>,
100 accumulator: Option<Accumulator>,
101 block_index: Option<BlockIndex>,
102}
103
104impl<R: Read> BlockTupleIterator<R> {
105 fn new(reader: E2StoreReader<R>) -> Self {
106 Self {
107 reader,
108 headers: Default::default(),
109 bodies: Default::default(),
110 receipts: Default::default(),
111 difficulties: Default::default(),
112 other_entries: Default::default(),
113 accumulator: None,
114 block_index: None,
115 }
116 }
117}
118
119impl<R: Read + Seek> Iterator for BlockTupleIterator<R> {
120 type Item = Result<BlockTuple, E2sError>;
121
122 fn next(&mut self) -> Option<Self::Item> {
123 self.next_result().transpose()
124 }
125}
126
127impl<R: Read + Seek> BlockTupleIterator<R> {
128 fn next_result(&mut self) -> Result<Option<BlockTuple>, E2sError> {
129 loop {
130 let Some(entry) = self.reader.read_next_entry()? else {
131 return Ok(None);
132 };
133
134 match entry.entry_type {
135 COMPRESSED_HEADER => {
136 self.headers.push_back(CompressedHeader::from_entry(&entry)?);
137 }
138 COMPRESSED_BODY => {
139 self.bodies.push_back(CompressedBody::from_entry(&entry)?);
140 }
141 COMPRESSED_RECEIPTS => {
142 self.receipts.push_back(CompressedReceipts::from_entry(&entry)?);
143 }
144 TOTAL_DIFFICULTY => {
145 self.difficulties.push_back(TotalDifficulty::from_entry(&entry)?);
146 }
147 ACCUMULATOR => {
148 if self.accumulator.is_some() {
149 return Err(E2sError::Ssz("Multiple accumulator entries found".to_string()));
150 }
151 self.accumulator = Some(Accumulator::from_entry(&entry)?);
152 }
153 BLOCK_INDEX => {
154 if self.block_index.is_some() {
155 return Err(E2sError::Ssz("Multiple block index entries found".to_string()));
156 }
157 self.block_index = Some(BlockIndex::from_entry(&entry)?);
158 }
159 _ => {
160 self.other_entries.push(entry);
161 }
162 }
163
164 if !self.headers.is_empty() &&
165 !self.bodies.is_empty() &&
166 !self.receipts.is_empty() &&
167 !self.difficulties.is_empty()
168 {
169 let header = self.headers.pop_front().unwrap();
170 let body = self.bodies.pop_front().unwrap();
171 let receipt = self.receipts.pop_front().unwrap();
172 let difficulty = self.difficulties.pop_front().unwrap();
173
174 return Ok(Some(BlockTuple::new(header, body, receipt, difficulty)));
175 }
176 }
177 }
178}
179
180impl<R: Read + Seek> StreamReader<R> for Era1Reader<R> {
181 type File = Era1File;
182 type Iterator = BlockTupleIterator<R>;
183
184 fn new(reader: R) -> Self {
186 Self { reader: E2StoreReader::new(reader) }
187 }
188
189 fn iter(self) -> BlockTupleIterator<R> {
191 BlockTupleIterator::new(self.reader)
192 }
193
194 fn read(self, network_name: String) -> Result<Self::File, E2sError> {
195 self.read_and_assemble(network_name)
196 }
197}
198
199impl<R: Read + Seek> Era1Reader<R> {
200 pub fn read_and_assemble(mut self, network_name: String) -> Result<Era1File, E2sError> {
203 let _version_entry = match self.reader.read_version()? {
205 Some(entry) if entry.is_version() => entry,
206 Some(_) => return Err(E2sError::Ssz("First entry is not a Version entry".to_string())),
207 None => return Err(E2sError::Ssz("Empty Era1 file".to_string())),
208 };
209
210 let mut iter = self.iter();
211 let blocks = (&mut iter).collect::<Result<Vec<_>, _>>()?;
212
213 let BlockTupleIterator {
214 headers,
215 bodies,
216 receipts,
217 difficulties,
218 other_entries,
219 accumulator,
220 block_index,
221 ..
222 } = iter;
223
224 if headers.len() != bodies.len() ||
226 headers.len() != receipts.len() ||
227 headers.len() != difficulties.len()
228 {
229 return Err(E2sError::Ssz(format!(
230 "Mismatched block component counts: headers={}, bodies={}, receipts={}, difficulties={}",
231 headers.len(), bodies.len(), receipts.len(), difficulties.len()
232 )));
233 }
234
235 let accumulator = accumulator
236 .ok_or_else(|| E2sError::Ssz("Era1 file missing accumulator entry".to_string()))?;
237
238 let block_index = block_index
239 .ok_or_else(|| E2sError::Ssz("Era1 file missing block index entry".to_string()))?;
240
241 let mut group = Era1Group::new(blocks, accumulator, block_index.clone());
242
243 for entry in other_entries {
245 group.add_entry(entry);
246 }
247
248 let id = Era1Id::new(
249 network_name,
250 block_index.starting_number(),
251 block_index.offsets().len() as u32,
252 );
253
254 Ok(Era1File::new(group, id))
255 }
256}
257
258#[derive(Debug)]
260pub struct Era1Writer<W: Write> {
261 writer: E2StoreWriter<W>,
262 has_written_version: bool,
263 has_written_blocks: bool,
264 has_written_accumulator: bool,
265 has_written_block_index: bool,
266}
267
268impl<W: Write> StreamWriter<W> for Era1Writer<W> {
269 type File = Era1File;
270
271 fn new(writer: W) -> Self {
273 Self {
274 writer: E2StoreWriter::new(writer),
275 has_written_version: false,
276 has_written_blocks: false,
277 has_written_accumulator: false,
278 has_written_block_index: false,
279 }
280 }
281
282 fn write_version(&mut self) -> Result<(), E2sError> {
284 if self.has_written_version {
285 return Ok(());
286 }
287
288 self.writer.write_version()?;
289 self.has_written_version = true;
290 Ok(())
291 }
292
293 fn write_file(&mut self, era1_file: &Era1File) -> Result<(), E2sError> {
295 self.write_version()?;
297
298 if era1_file.group.blocks.len() > MAX_BLOCKS_PER_ERA1 {
300 return Err(E2sError::Ssz("Era1 file cannot contain more than 8192 blocks".to_string()));
301 }
302
303 for block in &era1_file.group.blocks {
305 self.write_block(block)?;
306 }
307
308 for entry in &era1_file.group.other_entries {
310 self.writer.write_entry(entry)?;
311 }
312
313 self.write_accumulator(&era1_file.group.accumulator)?;
315
316 self.write_block_index(&era1_file.group.block_index)?;
318
319 self.writer.flush()?;
321
322 Ok(())
323 }
324
325 fn flush(&mut self) -> Result<(), E2sError> {
327 self.writer.flush()
328 }
329}
330
331impl<W: Write> Era1Writer<W> {
332 pub fn write_block(&mut self, block_tuple: &BlockTuple) -> Result<(), E2sError> {
334 if !self.has_written_version {
335 self.write_version()?;
336 }
337
338 if self.has_written_accumulator || self.has_written_block_index {
339 return Err(E2sError::Ssz(
340 "Cannot write blocks after accumulator or block index".to_string(),
341 ));
342 }
343
344 let header_entry = block_tuple.header.to_entry();
346 self.writer.write_entry(&header_entry)?;
347
348 let body_entry = block_tuple.body.to_entry();
350 self.writer.write_entry(&body_entry)?;
351
352 let receipts_entry = block_tuple.receipts.to_entry();
354 self.writer.write_entry(&receipts_entry)?;
355
356 let difficulty_entry = block_tuple.total_difficulty.to_entry();
358 self.writer.write_entry(&difficulty_entry)?;
359
360 self.has_written_blocks = true;
361
362 Ok(())
363 }
364
365 pub fn write_block_index(&mut self, block_index: &BlockIndex) -> Result<(), E2sError> {
367 if !self.has_written_version {
368 self.write_version()?;
369 }
370
371 if self.has_written_block_index {
372 return Err(E2sError::Ssz("Block index already written".to_string()));
373 }
374
375 let block_index_entry = block_index.to_entry();
376 self.writer.write_entry(&block_index_entry)?;
377 self.has_written_block_index = true;
378
379 Ok(())
380 }
381
382 pub fn write_accumulator(&mut self, accumulator: &Accumulator) -> Result<(), E2sError> {
384 if !self.has_written_version {
385 self.write_version()?;
386 }
387
388 if self.has_written_accumulator {
389 return Err(E2sError::Ssz("Accumulator already written".to_string()));
390 }
391
392 if self.has_written_block_index {
393 return Err(E2sError::Ssz("Cannot write accumulator after block index".to_string()));
394 }
395
396 let accumulator_entry = accumulator.to_entry();
397 self.writer.write_entry(&accumulator_entry)?;
398 self.has_written_accumulator = true;
399 Ok(())
400 }
401}
402
403#[cfg(test)]
404mod tests {
405 use super::*;
406 use crate::common::file_ops::{FileReader, FileWriter};
407 use alloy_primitives::{B256, U256};
408 use std::io::Cursor;
409 use tempfile::tempdir;
410
411 fn create_test_block(number: BlockNumber, data_size: usize) -> BlockTuple {
413 let header_data = vec![(number % 256) as u8; data_size];
414 let header = CompressedHeader::new(header_data);
415
416 let body_data = vec![((number + 1) % 256) as u8; data_size * 2];
417 let body = CompressedBody::new(body_data);
418
419 let receipts_data = vec![((number + 2) % 256) as u8; data_size];
420 let receipts = CompressedReceipts::new(receipts_data);
421
422 let difficulty = TotalDifficulty::new(U256::from(number * 1000));
423
424 BlockTuple::new(header, body, receipts, difficulty)
425 }
426
427 fn create_test_era1_file(
429 start_block: BlockNumber,
430 block_count: usize,
431 network: &str,
432 ) -> Era1File {
433 let mut blocks = Vec::with_capacity(block_count);
435 for i in 0..block_count {
436 let block_num = start_block + i as u64;
437 blocks.push(create_test_block(block_num, 32));
438 }
439
440 let accumulator = Accumulator::new(B256::from([0xAA; 32]));
441
442 let mut offsets = Vec::with_capacity(block_count);
443 for i in 0..block_count {
444 offsets.push(i as i64 * 100);
445 }
446 let block_index = BlockIndex::new(start_block, offsets);
447 let group = Era1Group::new(blocks, accumulator, block_index);
448 let id = Era1Id::new(network, start_block, block_count as u32);
449
450 Era1File::new(group, id)
451 }
452
453 #[test]
454 fn test_era1_roundtrip_memory() -> Result<(), E2sError> {
455 let start_block = 1000;
457 let era1_file = create_test_era1_file(1000, 5, "testnet");
458
459 let mut buffer = Vec::new();
461 {
462 let mut writer = Era1Writer::new(&mut buffer);
463 writer.write_file(&era1_file)?;
464 }
465
466 let reader = Era1Reader::new(Cursor::new(&buffer));
468 let read_era1 = reader.read("testnet".to_string())?;
469
470 assert_eq!(read_era1.id.network_name, "testnet");
472 assert_eq!(read_era1.id.start_block, 1000);
473 assert_eq!(read_era1.id.block_count, 5);
474 assert_eq!(read_era1.group.blocks.len(), 5);
475
476 assert_eq!(read_era1.group.blocks[0].total_difficulty.value, U256::from(1000 * 1000));
478 assert_eq!(read_era1.group.blocks[1].total_difficulty.value, U256::from(1001 * 1000));
479
480 assert_eq!(read_era1.group.blocks[0].header.data, vec![(start_block % 256) as u8; 32]);
482 assert_eq!(read_era1.group.blocks[0].body.data, vec![((start_block + 1) % 256) as u8; 64]);
483 assert_eq!(
484 read_era1.group.blocks[0].receipts.data,
485 vec![((start_block + 2) % 256) as u8; 32]
486 );
487
488 assert!(read_era1.contains_block(1000));
490 assert!(read_era1.contains_block(1004));
491 assert!(!read_era1.contains_block(999));
492 assert!(!read_era1.contains_block(1005));
493
494 let block_1002 = read_era1.get_block_by_number(1002);
495 assert!(block_1002.is_some());
496 assert_eq!(block_1002.unwrap().header.data, vec![((start_block + 2) % 256) as u8; 32]);
497
498 Ok(())
499 }
500
501 #[test]
502 fn test_era1_roundtrip_file() -> Result<(), E2sError> {
503 let temp_dir = tempdir().expect("Failed to create temp directory");
505 let file_path = temp_dir.path().join("test_roundtrip.era1");
506
507 let era1_file = create_test_era1_file(2000, 3, "mainnet");
509 Era1Writer::create(&file_path, &era1_file)?;
510
511 let read_era1 = Era1Reader::open(&file_path, "mainnet")?;
513
514 assert_eq!(read_era1.id.network_name, "mainnet");
516 assert_eq!(read_era1.id.start_block, 2000);
517 assert_eq!(read_era1.id.block_count, 3);
518 assert_eq!(read_era1.group.blocks.len(), 3);
519
520 for i in 0..3 {
522 let block_num = 2000 + i as u64;
523 let block = read_era1.get_block_by_number(block_num);
524 assert!(block.is_some());
525 assert_eq!(block.unwrap().header.data, vec![block_num as u8; 32]);
526 }
527
528 Ok(())
529 }
530}