1use crate::{
9 common::file_ops::{EraFileFormat, FileReader, StreamReader, StreamWriter},
10 e2s::{
11 error::E2sError,
12 file::{E2StoreReader, E2StoreWriter},
13 types::{Entry, IndexEntry, Version},
14 },
15 era1::types::{
16 execution::{
17 Accumulator, BlockTuple, CompressedBody, CompressedHeader, CompressedReceipts,
18 TotalDifficulty, ACCUMULATOR, COMPRESSED_BODY, COMPRESSED_HEADER, COMPRESSED_RECEIPTS,
19 MAX_BLOCKS_PER_ERA1, TOTAL_DIFFICULTY,
20 },
21 group::{BlockIndex, Era1Group, Era1Id, BLOCK_INDEX},
22 },
23};
24use alloy_primitives::BlockNumber;
25use std::{
26 collections::VecDeque,
27 fs::File,
28 io::{Read, Seek, Write},
29};
30
31#[derive(Debug)]
33pub struct Era1File {
34 pub version: Version,
36
37 pub group: Era1Group,
39
40 pub id: Era1Id,
42}
43
44impl EraFileFormat for Era1File {
45 type EraGroup = Era1Group;
46 type Id = Era1Id;
47
48 fn new(group: Era1Group, id: Era1Id) -> Self {
50 Self { version: Version, group, id }
51 }
52
53 fn version(&self) -> &Version {
54 &self.version
55 }
56
57 fn group(&self) -> &Self::EraGroup {
58 &self.group
59 }
60
61 fn id(&self) -> &Self::Id {
62 &self.id
63 }
64}
65
66impl Era1File {
67 pub fn get_block_by_number(&self, number: BlockNumber) -> Option<&BlockTuple> {
69 let index = (number - self.group.block_index.starting_number()) as usize;
70 (index < self.group.blocks.len()).then(|| &self.group.blocks[index])
71 }
72
73 pub fn block_range(&self) -> std::ops::RangeInclusive<BlockNumber> {
75 let start = self.group.block_index.starting_number();
76 let end = start + (self.group.blocks.len() as u64) - 1;
77 start..=end
78 }
79
80 pub fn contains_block(&self, number: BlockNumber) -> bool {
82 self.block_range().contains(&number)
83 }
84}
85
86#[derive(Debug)]
88pub struct Era1Reader<R: Read> {
89 reader: E2StoreReader<R>,
90}
91
92#[derive(Debug)]
94pub struct BlockTupleIterator<R: Read> {
95 reader: E2StoreReader<R>,
96 headers: VecDeque<CompressedHeader>,
97 bodies: VecDeque<CompressedBody>,
98 receipts: VecDeque<CompressedReceipts>,
99 difficulties: VecDeque<TotalDifficulty>,
100 other_entries: Vec<Entry>,
101 accumulator: Option<Accumulator>,
102 block_index: Option<BlockIndex>,
103}
104
105impl<R: Read> BlockTupleIterator<R> {
106 fn new(reader: E2StoreReader<R>) -> Self {
107 Self {
108 reader,
109 headers: Default::default(),
110 bodies: Default::default(),
111 receipts: Default::default(),
112 difficulties: Default::default(),
113 other_entries: Default::default(),
114 accumulator: None,
115 block_index: None,
116 }
117 }
118}
119
120impl<R: Read + Seek> Iterator for BlockTupleIterator<R> {
121 type Item = Result<BlockTuple, E2sError>;
122
123 fn next(&mut self) -> Option<Self::Item> {
124 self.next_result().transpose()
125 }
126}
127
128impl<R: Read + Seek> BlockTupleIterator<R> {
129 fn next_result(&mut self) -> Result<Option<BlockTuple>, E2sError> {
130 loop {
131 let Some(entry) = self.reader.read_next_entry()? else {
132 return Ok(None);
133 };
134
135 match entry.entry_type {
136 COMPRESSED_HEADER => {
137 self.headers.push_back(CompressedHeader::from_entry(&entry)?);
138 }
139 COMPRESSED_BODY => {
140 self.bodies.push_back(CompressedBody::from_entry(&entry)?);
141 }
142 COMPRESSED_RECEIPTS => {
143 self.receipts.push_back(CompressedReceipts::from_entry(&entry)?);
144 }
145 TOTAL_DIFFICULTY => {
146 self.difficulties.push_back(TotalDifficulty::from_entry(&entry)?);
147 }
148 ACCUMULATOR => {
149 if self.accumulator.is_some() {
150 return Err(E2sError::Ssz("Multiple accumulator entries found".to_string()));
151 }
152 self.accumulator = Some(Accumulator::from_entry(&entry)?);
153 }
154 BLOCK_INDEX => {
155 if self.block_index.is_some() {
156 return Err(E2sError::Ssz("Multiple block index entries found".to_string()));
157 }
158 self.block_index = Some(BlockIndex::from_entry(&entry)?);
159 }
160 _ => {
161 self.other_entries.push(entry);
162 }
163 }
164
165 if !self.headers.is_empty() &&
166 !self.bodies.is_empty() &&
167 !self.receipts.is_empty() &&
168 !self.difficulties.is_empty()
169 {
170 let header = self.headers.pop_front().unwrap();
171 let body = self.bodies.pop_front().unwrap();
172 let receipt = self.receipts.pop_front().unwrap();
173 let difficulty = self.difficulties.pop_front().unwrap();
174
175 return Ok(Some(BlockTuple::new(header, body, receipt, difficulty)));
176 }
177 }
178 }
179}
180
181impl<R: Read + Seek> StreamReader<R> for Era1Reader<R> {
182 type File = Era1File;
183 type Iterator = BlockTupleIterator<R>;
184
185 fn new(reader: R) -> Self {
187 Self { reader: E2StoreReader::new(reader) }
188 }
189
190 fn iter(self) -> BlockTupleIterator<R> {
192 BlockTupleIterator::new(self.reader)
193 }
194
195 fn read(self, network_name: String) -> Result<Self::File, E2sError> {
196 self.read_and_assemble(network_name)
197 }
198}
199
200impl<R: Read + Seek> Era1Reader<R> {
201 pub fn read_and_assemble(mut self, network_name: String) -> Result<Era1File, E2sError> {
204 let _version_entry = match self.reader.read_version()? {
206 Some(entry) if entry.is_version() => entry,
207 Some(_) => return Err(E2sError::Ssz("First entry is not a Version entry".to_string())),
208 None => return Err(E2sError::Ssz("Empty Era1 file".to_string())),
209 };
210
211 let mut iter = self.iter();
212 let blocks = (&mut iter).collect::<Result<Vec<_>, _>>()?;
213
214 let BlockTupleIterator {
215 headers,
216 bodies,
217 receipts,
218 difficulties,
219 other_entries,
220 accumulator,
221 block_index,
222 ..
223 } = iter;
224
225 if headers.len() != bodies.len() ||
227 headers.len() != receipts.len() ||
228 headers.len() != difficulties.len()
229 {
230 return Err(E2sError::Ssz(format!(
231 "Mismatched block component counts: headers={}, bodies={}, receipts={}, difficulties={}",
232 headers.len(), bodies.len(), receipts.len(), difficulties.len()
233 )));
234 }
235
236 let accumulator = accumulator
237 .ok_or_else(|| E2sError::Ssz("Era1 file missing accumulator entry".to_string()))?;
238
239 let block_index = block_index
240 .ok_or_else(|| E2sError::Ssz("Era1 file missing block index entry".to_string()))?;
241
242 let mut group = Era1Group::new(blocks, accumulator, block_index.clone());
243
244 for entry in other_entries {
246 group.add_entry(entry);
247 }
248
249 let id = Era1Id::new(
250 network_name,
251 block_index.starting_number(),
252 block_index.offsets().len() as u32,
253 );
254
255 Ok(Era1File::new(group, id))
256 }
257}
258
259impl FileReader for Era1Reader<File> {}
260
261#[derive(Debug)]
263pub struct Era1Writer<W: Write> {
264 writer: E2StoreWriter<W>,
265 has_written_version: bool,
266 has_written_blocks: bool,
267 has_written_accumulator: bool,
268 has_written_block_index: bool,
269}
270
271impl<W: Write> StreamWriter<W> for Era1Writer<W> {
272 type File = Era1File;
273
274 fn new(writer: W) -> Self {
276 Self {
277 writer: E2StoreWriter::new(writer),
278 has_written_version: false,
279 has_written_blocks: false,
280 has_written_accumulator: false,
281 has_written_block_index: false,
282 }
283 }
284
285 fn write_version(&mut self) -> Result<(), E2sError> {
287 if self.has_written_version {
288 return Ok(());
289 }
290
291 self.writer.write_version()?;
292 self.has_written_version = true;
293 Ok(())
294 }
295
296 fn write_file(&mut self, era1_file: &Era1File) -> Result<(), E2sError> {
298 self.write_version()?;
300
301 if era1_file.group.blocks.len() > MAX_BLOCKS_PER_ERA1 {
303 return Err(E2sError::Ssz("Era1 file cannot contain more than 8192 blocks".to_string()));
304 }
305
306 for block in &era1_file.group.blocks {
308 self.write_block(block)?;
309 }
310
311 for entry in &era1_file.group.other_entries {
313 self.writer.write_entry(entry)?;
314 }
315
316 self.write_accumulator(&era1_file.group.accumulator)?;
318
319 self.write_block_index(&era1_file.group.block_index)?;
321
322 self.writer.flush()?;
324
325 Ok(())
326 }
327
328 fn flush(&mut self) -> Result<(), E2sError> {
330 self.writer.flush()
331 }
332}
333
334impl<W: Write> Era1Writer<W> {
335 pub fn write_block(&mut self, block_tuple: &BlockTuple) -> Result<(), E2sError> {
337 if !self.has_written_version {
338 self.write_version()?;
339 }
340
341 if self.has_written_accumulator || self.has_written_block_index {
342 return Err(E2sError::Ssz(
343 "Cannot write blocks after accumulator or block index".to_string(),
344 ));
345 }
346
347 let header_entry = block_tuple.header.to_entry();
349 self.writer.write_entry(&header_entry)?;
350
351 let body_entry = block_tuple.body.to_entry();
353 self.writer.write_entry(&body_entry)?;
354
355 let receipts_entry = block_tuple.receipts.to_entry();
357 self.writer.write_entry(&receipts_entry)?;
358
359 let difficulty_entry = block_tuple.total_difficulty.to_entry();
361 self.writer.write_entry(&difficulty_entry)?;
362
363 self.has_written_blocks = true;
364
365 Ok(())
366 }
367
368 pub fn write_block_index(&mut self, block_index: &BlockIndex) -> Result<(), E2sError> {
370 if !self.has_written_version {
371 self.write_version()?;
372 }
373
374 if self.has_written_block_index {
375 return Err(E2sError::Ssz("Block index already written".to_string()));
376 }
377
378 let block_index_entry = block_index.to_entry();
379 self.writer.write_entry(&block_index_entry)?;
380 self.has_written_block_index = true;
381
382 Ok(())
383 }
384
385 pub fn write_accumulator(&mut self, accumulator: &Accumulator) -> Result<(), E2sError> {
387 if !self.has_written_version {
388 self.write_version()?;
389 }
390
391 if self.has_written_accumulator {
392 return Err(E2sError::Ssz("Accumulator already written".to_string()));
393 }
394
395 if self.has_written_block_index {
396 return Err(E2sError::Ssz("Cannot write accumulator after block index".to_string()));
397 }
398
399 let accumulator_entry = accumulator.to_entry();
400 self.writer.write_entry(&accumulator_entry)?;
401 self.has_written_accumulator = true;
402 Ok(())
403 }
404}
405
406#[cfg(test)]
407mod tests {
408 use super::*;
409 use crate::common::file_ops::FileWriter;
410 use alloy_primitives::{B256, U256};
411 use std::io::Cursor;
412 use tempfile::tempdir;
413
414 fn create_test_block(number: BlockNumber, data_size: usize) -> BlockTuple {
416 let header_data = vec![(number % 256) as u8; data_size];
417 let header = CompressedHeader::new(header_data);
418
419 let body_data = vec![((number + 1) % 256) as u8; data_size * 2];
420 let body = CompressedBody::new(body_data);
421
422 let receipts_data = vec![((number + 2) % 256) as u8; data_size];
423 let receipts = CompressedReceipts::new(receipts_data);
424
425 let difficulty = TotalDifficulty::new(U256::from(number * 1000));
426
427 BlockTuple::new(header, body, receipts, difficulty)
428 }
429
430 fn create_test_era1_file(
432 start_block: BlockNumber,
433 block_count: usize,
434 network: &str,
435 ) -> Era1File {
436 let mut blocks = Vec::with_capacity(block_count);
438 for i in 0..block_count {
439 let block_num = start_block + i as u64;
440 blocks.push(create_test_block(block_num, 32));
441 }
442
443 let accumulator = Accumulator::new(B256::from([0xAA; 32]));
444
445 let mut offsets = Vec::with_capacity(block_count);
446 for i in 0..block_count {
447 offsets.push(i as i64 * 100);
448 }
449 let block_index = BlockIndex::new(start_block, offsets);
450 let group = Era1Group::new(blocks, accumulator, block_index);
451 let id = Era1Id::new(network, start_block, block_count as u32);
452
453 Era1File::new(group, id)
454 }
455
456 #[test]
457 fn test_era1_roundtrip_memory() -> Result<(), E2sError> {
458 let start_block = 1000;
460 let era1_file = create_test_era1_file(1000, 5, "testnet");
461
462 let mut buffer = Vec::new();
464 {
465 let mut writer = Era1Writer::new(&mut buffer);
466 writer.write_file(&era1_file)?;
467 }
468
469 let reader = Era1Reader::new(Cursor::new(&buffer));
471 let read_era1 = reader.read("testnet".to_string())?;
472
473 assert_eq!(read_era1.id.network_name, "testnet");
475 assert_eq!(read_era1.id.start_block, 1000);
476 assert_eq!(read_era1.id.block_count, 5);
477 assert_eq!(read_era1.group.blocks.len(), 5);
478
479 assert_eq!(read_era1.group.blocks[0].total_difficulty.value, U256::from(1000 * 1000));
481 assert_eq!(read_era1.group.blocks[1].total_difficulty.value, U256::from(1001 * 1000));
482
483 assert_eq!(read_era1.group.blocks[0].header.data, vec![(start_block % 256) as u8; 32]);
485 assert_eq!(read_era1.group.blocks[0].body.data, vec![((start_block + 1) % 256) as u8; 64]);
486 assert_eq!(
487 read_era1.group.blocks[0].receipts.data,
488 vec![((start_block + 2) % 256) as u8; 32]
489 );
490
491 assert!(read_era1.contains_block(1000));
493 assert!(read_era1.contains_block(1004));
494 assert!(!read_era1.contains_block(999));
495 assert!(!read_era1.contains_block(1005));
496
497 let block_1002 = read_era1.get_block_by_number(1002);
498 assert!(block_1002.is_some());
499 assert_eq!(block_1002.unwrap().header.data, vec![((start_block + 2) % 256) as u8; 32]);
500
501 Ok(())
502 }
503
504 #[test]
505 fn test_era1_roundtrip_file() -> Result<(), E2sError> {
506 let temp_dir = tempdir().expect("Failed to create temp directory");
508 let file_path = temp_dir.path().join("test_roundtrip.era1");
509
510 let era1_file = create_test_era1_file(2000, 3, "mainnet");
512 Era1Writer::create(&file_path, &era1_file)?;
513
514 let read_era1 = Era1Reader::open(&file_path, "mainnet")?;
516
517 assert_eq!(read_era1.id.network_name, "mainnet");
519 assert_eq!(read_era1.id.start_block, 2000);
520 assert_eq!(read_era1.id.block_count, 3);
521 assert_eq!(read_era1.group.blocks.len(), 3);
522
523 for i in 0..3 {
525 let block_num = 2000 + i as u64;
526 let block = read_era1.get_block_by_number(block_num);
527 assert!(block.is_some());
528 assert_eq!(block.unwrap().header.data, vec![block_num as u8; 32]);
529 }
530
531 Ok(())
532 }
533}