1use super::{ChunkAccumulator, EraBlockWriter, ExportBlock};
4use crate::Ere;
5use alloy_consensus::{BlockHeader, TxType};
6use alloy_primitives::{B256, U256};
7use alloy_rlp::Encodable;
8use eyre::{eyre, Result};
9use reth_era::{
10 common::file_ops::{EraFileFormat, EraFileId, StreamWriter},
11 e2s::types::Header,
12 ere::{
13 file::{EreFile, EreWriter},
14 types::{
15 execution::{
16 Accumulator, BlockTuple, CompressedBody, CompressedHeader, CompressedSlimReceipts,
17 HeaderRecord, SlimReceipt, TotalDifficulty, MAX_BLOCKS_PER_ERE,
18 },
19 group::{DynamicBlockIndex, EreGroup, EreId, EreProfile},
20 },
21 },
22};
23use reth_primitives_traits::Receipt;
24use std::path::{Path, PathBuf};
25
26impl EraBlockWriter for Ere {
27 fn write_file<H, B, R>(
28 network: &str,
29 max_blocks_per_file: u64,
30 blocks: &[ExportBlock<H, B, R>],
31 dir: &Path,
32 ) -> Result<PathBuf>
33 where
34 H: BlockHeader + Encodable,
35 B: Encodable,
36 R: Receipt,
37 {
38 let pre_merge = !blocks[0].header.difficulty().is_zero();
42 let pre_merge_count = blocks.partition_point(|b| !b.header.difficulty().is_zero());
46
47 let tuples = blocks
48 .iter()
49 .map(|block| compress_block(block, pre_merge))
50 .collect::<Result<Vec<_>>>()?;
51 let accumulator = pre_merge
52 .then(|| super::accumulator::<Accumulator, _, _, _>(&blocks[..pre_merge_count]))
53 .transpose()?;
54 let id = file_id(network, max_blocks_per_file, blocks)?;
55 let index = block_index(blocks[0].header.number(), &tuples, accumulator.as_ref());
56
57 let file_path = dir.join(id.to_file_name());
58 let group = EreGroup::new(tuples, accumulator, index);
59
60 EreWriter::new(std::fs::File::create(&file_path)?)
61 .write_file(&EreFile::new(group, id))
62 .map_err(|e| eyre!("Failed to write ERE file {file_path:?}: {e}"))?;
63
64 Ok(file_path)
65 }
66}
67
68fn compress_block<H, B, R>(
74 block: &ExportBlock<H, B, R>,
75 include_total_difficulty: bool,
76) -> Result<BlockTuple>
77where
78 H: BlockHeader + Encodable,
79 B: Encodable,
80 R: Receipt,
81{
82 let header = CompressedHeader::from_header(&block.header)?;
83 let body = CompressedBody::from_body(&block.body)?;
84
85 let slim_receipts = block
86 .receipts
87 .iter()
88 .map(|receipt| {
89 Ok(SlimReceipt {
90 tx_type: TxType::try_from(receipt.ty())
91 .map_err(|e| eyre!("Unexpected transaction type in receipt: {e}"))?,
92 status: receipt.status_or_post_state(),
93 cumulative_gas_used: receipt.cumulative_gas_used(),
94 logs: receipt.logs().to_vec(),
95 })
96 })
97 .collect::<Result<Vec<_>>>()?;
98 let receipts = CompressedSlimReceipts::from_receipts(&slim_receipts)
99 .map_err(|e| eyre!("Failed to compress receipts: {e}"))?;
100
101 let tuple = BlockTuple::new(header, body).with_receipts(receipts);
102 Ok(if include_total_difficulty {
103 tuple.with_total_difficulty(TotalDifficulty::new(block.total_difficulty))
104 } else {
105 tuple
106 })
107}
108
109impl ChunkAccumulator for Accumulator {
110 fn from_pairs(records: &[(B256, U256)]) -> Result<Self> {
111 let records: Vec<HeaderRecord> = records
112 .iter()
113 .map(|&(block_hash, total_difficulty)| HeaderRecord { block_hash, total_difficulty })
114 .collect();
115 Self::from_header_records(&records).map_err(|e| eyre!("Failed to compute accumulator: {e}"))
116 }
117}
118
119fn file_id<H: BlockHeader, B, R>(
123 network: &str,
124 max_blocks_per_file: u64,
125 blocks: &[ExportBlock<H, B, R>],
126) -> Result<EreId> {
127 let last_block_hash =
128 blocks.last().ok_or_else(|| eyre!("cannot build ERE file id from empty block range"))?;
129 let file_hash = super::short_hash(last_block_hash.block_hash);
130 let id = EreId::new(network, blocks[0].header.number(), blocks.len() as u32)
131 .with_hash(file_hash)
132 .with_profile(EreProfile::NoProofs);
133 Ok(if max_blocks_per_file == MAX_BLOCKS_PER_ERE as u64 { id } else { id.with_era_count() })
135}
136
137fn block_index(
144 start_block: u64,
145 tuples: &[BlockTuple],
146 accumulator: Option<&Accumulator>,
147) -> DynamicBlockIndex {
148 let has_total_difficulty = tuples.first().is_some_and(|t| t.total_difficulty.is_some());
151 let component_count = if has_total_difficulty { 4 } else { 3 };
152
153 let mut position = Header::SIZE as i64;
156 let mut section = |sizes: &mut dyn Iterator<Item = i64>| {
157 let mut starts = Vec::with_capacity(tuples.len());
158 for size in sizes {
159 starts.push(position);
160 position += size;
161 }
162 starts
163 };
164
165 let header_pos = section(&mut tuples.iter().map(|t| t.header.to_entry().size() as i64));
166 let body_pos = section(&mut tuples.iter().map(|t| t.body.to_entry().size() as i64));
167 let receipts_pos =
168 section(&mut tuples.iter().map(|t| entry_size(t.receipts.as_ref().map(|r| r.to_entry()))));
169 let difficulty_pos = has_total_difficulty.then(|| {
170 section(
171 &mut tuples
172 .iter()
173 .map(|t| entry_size(t.total_difficulty.as_ref().map(|d| d.to_entry()))),
174 )
175 });
176
177 if let Some(accumulator) = accumulator {
178 position += accumulator.to_entry().size() as i64;
179 }
180 let index_position = position;
181
182 let mut offsets = Vec::with_capacity(tuples.len() * component_count as usize);
183 for i in 0..tuples.len() {
184 offsets.push(header_pos[i] - index_position);
185 offsets.push(body_pos[i] - index_position);
186 offsets.push(receipts_pos[i] - index_position);
187 if let Some(difficulty_pos) = &difficulty_pos {
188 offsets.push(difficulty_pos[i] - index_position);
189 }
190 }
191
192 DynamicBlockIndex::new(start_block, component_count, offsets)
193}
194
195fn entry_size(entry: Option<reth_era::e2s::types::Entry>) -> i64 {
197 entry.map_or(0, |e| e.size() as i64)
198}
199
200#[cfg(test)]
201mod tests {
202 use super::*;
203 use alloy_consensus::Header;
204 use reth_era::{common::file_ops::StreamReader, ere::file::EreReader};
205 use reth_ethereum_primitives::{BlockBody, Receipt as EthReceipt};
206 use tempfile::tempdir;
207
208 fn export_block(number: u64, difficulty: U256) -> ExportBlock<Header, BlockBody, EthReceipt> {
211 ExportBlock {
212 header: Header { number, difficulty, ..Default::default() },
213 block_hash: B256::repeat_byte(number as u8 + 1),
214 body: BlockBody::default(),
215 receipts: Vec::new(),
216 total_difficulty: U256::from(number + 1),
217 }
218 }
219
220 fn export_blocks(
222 count: u64,
223 difficulty: U256,
224 ) -> Vec<ExportBlock<Header, BlockBody, EthReceipt>> {
225 (0..count).map(|number| export_block(number, difficulty)).collect()
226 }
227
228 fn write_and_read(blocks: &[ExportBlock<Header, BlockBody, EthReceipt>]) -> EreFile {
229 let dir = tempdir().unwrap();
230 let path =
231 Ere::write_file("mainnet", MAX_BLOCKS_PER_ERE as u64, blocks, dir.path()).unwrap();
232 EreReader::new(std::fs::File::open(path).unwrap()).read("mainnet".to_string()).unwrap()
233 }
234
235 #[test]
236 fn pre_merge_file_carries_total_difficulty_and_accumulator() {
237 let file = write_and_read(&export_blocks(3, U256::from(1)));
238
239 assert!(file.group.accumulator.is_some());
240 assert_eq!(file.group.index.component_count(), 4);
241 assert!(file.group.blocks.iter().all(|b| b.total_difficulty.is_some()));
242 }
243
244 #[test]
245 fn post_merge_file_omits_total_difficulty_and_accumulator() {
246 let file = write_and_read(&export_blocks(3, U256::ZERO));
247
248 assert!(file.group.accumulator.is_none());
249 assert_eq!(file.group.index.component_count(), 3);
250 assert!(file.group.blocks.iter().all(|b| b.total_difficulty.is_none()));
251 }
252
253 #[test]
254 fn merge_spanning_file_excludes_post_merge_blocks_from_accumulator() {
255 let blocks: Vec<_> = (0..4)
257 .map(|n| export_block(n, if n < 2 { U256::from(1) } else { U256::ZERO }))
258 .collect();
259 let file = write_and_read(&blocks);
260
261 assert_eq!(file.group.index.component_count(), 4);
263 assert!(file.group.blocks.iter().all(|b| b.total_difficulty.is_some()));
264
265 let expected = Accumulator::from_header_records(
267 &blocks[..2]
268 .iter()
269 .map(|b| HeaderRecord {
270 block_hash: b.block_hash,
271 total_difficulty: b.total_difficulty,
272 })
273 .collect::<Vec<_>>(),
274 )
275 .unwrap();
276 assert_eq!(file.group.accumulator.unwrap().root, expected.root);
277 }
278}