Skip to main content

reth_era_utils/export/
ere.rs

1//! `.ere` block-history writer.
2
3use super::{ChunkAccumulator, EraBlockWriter, ExportBlock};
4use crate::Ere;
5use alloy_consensus::{BlockHeader, TxType};
6use alloy_primitives::{B256, U256};
7use alloy_rlp::Encodable;
8use eyre::{eyre, Result};
9use reth_era::{
10    common::file_ops::{EraFileFormat, EraFileId, StreamWriter},
11    e2s::types::Header,
12    ere::{
13        file::{EreFile, EreWriter},
14        types::{
15            execution::{
16                Accumulator, BlockTuple, CompressedBody, CompressedHeader, CompressedSlimReceipts,
17                HeaderRecord, SlimReceipt, TotalDifficulty, MAX_BLOCKS_PER_ERE,
18            },
19            group::{DynamicBlockIndex, EreGroup, EreId, EreProfile},
20        },
21    },
22};
23use reth_primitives_traits::Receipt;
24use std::path::{Path, PathBuf};
25
26impl EraBlockWriter for Ere {
27    fn write_file<H, B, R>(
28        network: &str,
29        max_blocks_per_file: u64,
30        blocks: &[ExportBlock<H, B, R>],
31        dir: &Path,
32    ) -> Result<PathBuf>
33    where
34        H: BlockHeader + Encodable,
35        B: Encodable,
36        R: Receipt,
37    {
38        // Total difficulty and the accumulator are pre-merge only: post-merge blocks have zero
39        // difficulty and the accumulator is frozen at the merge. Difficulty drops to zero
40        // monotonically, so the first block decides whether the file carries them at all.
41        let pre_merge = !blocks[0].header.difficulty().is_zero();
42        // Post-merge blocks are not part of any epoch accumulator, so a merge-spanning file builds
43        // its accumulator from the pre-merge prefix only, while total difficulty is kept for every
44        // block.
45        let pre_merge_count = blocks.partition_point(|b| !b.header.difficulty().is_zero());
46
47        let tuples = blocks
48            .iter()
49            .map(|block| compress_block(block, pre_merge))
50            .collect::<Result<Vec<_>>>()?;
51        let accumulator = pre_merge
52            .then(|| super::accumulator::<Accumulator, _, _, _>(&blocks[..pre_merge_count]))
53            .transpose()?;
54        let id = file_id(network, max_blocks_per_file, blocks)?;
55        let index = block_index(blocks[0].header.number(), &tuples, accumulator.as_ref());
56
57        let file_path = dir.join(id.to_file_name());
58        let group = EreGroup::new(tuples, accumulator, index);
59
60        EreWriter::new(std::fs::File::create(&file_path)?)
61            .write_file(&EreFile::new(group, id))
62            .map_err(|e| eyre!("Failed to write ERE file {file_path:?}: {e}"))?;
63
64        Ok(file_path)
65    }
66}
67
68/// Compresses one block into an `ere` [`BlockTuple`] (header, body, slim receipts, and, for
69/// pre-merge blocks, cumulative total difficulty).
70///
71/// The optional ERE `Proof` (a Portal Network header-inclusion proof) is not produced here, so the
72/// file carries the [`EreProfile::NoProofs`] profile.
73fn compress_block<H, B, R>(
74    block: &ExportBlock<H, B, R>,
75    include_total_difficulty: bool,
76) -> Result<BlockTuple>
77where
78    H: BlockHeader + Encodable,
79    B: Encodable,
80    R: Receipt,
81{
82    let header = CompressedHeader::from_header(&block.header)?;
83    let body = CompressedBody::from_body(&block.body)?;
84
85    let slim_receipts = block
86        .receipts
87        .iter()
88        .map(|receipt| {
89            Ok(SlimReceipt {
90                tx_type: TxType::try_from(receipt.ty())
91                    .map_err(|e| eyre!("Unexpected transaction type in receipt: {e}"))?,
92                status: receipt.status_or_post_state(),
93                cumulative_gas_used: receipt.cumulative_gas_used(),
94                logs: receipt.logs().to_vec(),
95            })
96        })
97        .collect::<Result<Vec<_>>>()?;
98    let receipts = CompressedSlimReceipts::from_receipts(&slim_receipts)
99        .map_err(|e| eyre!("Failed to compress receipts: {e}"))?;
100
101    let tuple = BlockTuple::new(header, body).with_receipts(receipts);
102    Ok(if include_total_difficulty {
103        tuple.with_total_difficulty(TotalDifficulty::new(block.total_difficulty))
104    } else {
105        tuple
106    })
107}
108
109impl ChunkAccumulator for Accumulator {
110    fn from_pairs(records: &[(B256, U256)]) -> Result<Self> {
111        let records: Vec<HeaderRecord> = records
112            .iter()
113            .map(|&(block_hash, total_difficulty)| HeaderRecord { block_hash, total_difficulty })
114            .collect();
115        Self::from_header_records(&records).map_err(|e| eyre!("Failed to compute accumulator: {e}"))
116    }
117}
118
119/// Builds the file identifier.
120///
121/// Per the [`EreId`] contract, the short hash is the first four bytes of the last block's hash.
122fn file_id<H: BlockHeader, B, R>(
123    network: &str,
124    max_blocks_per_file: u64,
125    blocks: &[ExportBlock<H, B, R>],
126) -> Result<EreId> {
127    let last_block_hash =
128        blocks.last().ok_or_else(|| eyre!("cannot build ERE file id from empty block range"))?;
129    let file_hash = super::short_hash(last_block_hash.block_hash);
130    let id = EreId::new(network, blocks[0].header.number(), blocks.len() as u32)
131        .with_hash(file_hash)
132        .with_profile(EreProfile::NoProofs);
133    // Custom block-per-file exports tag the era count into the filename.
134    Ok(if max_blocks_per_file == MAX_BLOCKS_PER_ERE as u64 { id } else { id.with_era_count() })
135}
136
137/// Builds the [`DynamicBlockIndex`] for the file's sectioned layout.
138///
139/// `ere` groups records by type, so the file is laid out (after the version record) as: all
140/// headers, all bodies, all receipts, all total-difficulties and the accumulator (both pre-merge
141/// only), then the index. Offsets are negative `i64`s relative to the index record, per the spec's
142/// backward-pointing convention.
143fn block_index(
144    start_block: u64,
145    tuples: &[BlockTuple],
146    accumulator: Option<&Accumulator>,
147) -> DynamicBlockIndex {
148    // Every block carries header + body + receipts, plus total-difficulty for pre-merge files
149    // (proofs are always omitted), so the index stores three or four offsets per block.
150    let has_total_difficulty = tuples.first().is_some_and(|t| t.total_difficulty.is_some());
151    let component_count = if has_total_difficulty { 4 } else { 3 };
152
153    // Absolute position of each block's components, walked section by section, starting past the
154    // leading version record.
155    let mut position = Header::SIZE as i64;
156    let mut section = |sizes: &mut dyn Iterator<Item = i64>| {
157        let mut starts = Vec::with_capacity(tuples.len());
158        for size in sizes {
159            starts.push(position);
160            position += size;
161        }
162        starts
163    };
164
165    let header_pos = section(&mut tuples.iter().map(|t| t.header.to_entry().size() as i64));
166    let body_pos = section(&mut tuples.iter().map(|t| t.body.to_entry().size() as i64));
167    let receipts_pos =
168        section(&mut tuples.iter().map(|t| entry_size(t.receipts.as_ref().map(|r| r.to_entry()))));
169    let difficulty_pos = has_total_difficulty.then(|| {
170        section(
171            &mut tuples
172                .iter()
173                .map(|t| entry_size(t.total_difficulty.as_ref().map(|d| d.to_entry()))),
174        )
175    });
176
177    if let Some(accumulator) = accumulator {
178        position += accumulator.to_entry().size() as i64;
179    }
180    let index_position = position;
181
182    let mut offsets = Vec::with_capacity(tuples.len() * component_count as usize);
183    for i in 0..tuples.len() {
184        offsets.push(header_pos[i] - index_position);
185        offsets.push(body_pos[i] - index_position);
186        offsets.push(receipts_pos[i] - index_position);
187        if let Some(difficulty_pos) = &difficulty_pos {
188            offsets.push(difficulty_pos[i] - index_position);
189        }
190    }
191
192    DynamicBlockIndex::new(start_block, component_count, offsets)
193}
194
195/// Serialized size of an optional record, or `0` when absent.
196fn entry_size(entry: Option<reth_era::e2s::types::Entry>) -> i64 {
197    entry.map_or(0, |e| e.size() as i64)
198}
199
200#[cfg(test)]
201mod tests {
202    use super::*;
203    use alloy_consensus::Header;
204    use reth_era::{common::file_ops::StreamReader, ere::file::EreReader};
205    use reth_ethereum_primitives::{BlockBody, Receipt as EthReceipt};
206    use tempfile::tempdir;
207
208    /// One empty block with the given number and difficulty (zero difficulty marks a post-merge
209    /// block).
210    fn export_block(number: u64, difficulty: U256) -> ExportBlock<Header, BlockBody, EthReceipt> {
211        ExportBlock {
212            header: Header { number, difficulty, ..Default::default() },
213            block_hash: B256::repeat_byte(number as u8 + 1),
214            body: BlockBody::default(),
215            receipts: Vec::new(),
216            total_difficulty: U256::from(number + 1),
217        }
218    }
219
220    /// `count` blocks sharing one difficulty, so the chunk is uniformly pre- or post-merge.
221    fn export_blocks(
222        count: u64,
223        difficulty: U256,
224    ) -> Vec<ExportBlock<Header, BlockBody, EthReceipt>> {
225        (0..count).map(|number| export_block(number, difficulty)).collect()
226    }
227
228    fn write_and_read(blocks: &[ExportBlock<Header, BlockBody, EthReceipt>]) -> EreFile {
229        let dir = tempdir().unwrap();
230        let path =
231            Ere::write_file("mainnet", MAX_BLOCKS_PER_ERE as u64, blocks, dir.path()).unwrap();
232        EreReader::new(std::fs::File::open(path).unwrap()).read("mainnet".to_string()).unwrap()
233    }
234
235    #[test]
236    fn pre_merge_file_carries_total_difficulty_and_accumulator() {
237        let file = write_and_read(&export_blocks(3, U256::from(1)));
238
239        assert!(file.group.accumulator.is_some());
240        assert_eq!(file.group.index.component_count(), 4);
241        assert!(file.group.blocks.iter().all(|b| b.total_difficulty.is_some()));
242    }
243
244    #[test]
245    fn post_merge_file_omits_total_difficulty_and_accumulator() {
246        let file = write_and_read(&export_blocks(3, U256::ZERO));
247
248        assert!(file.group.accumulator.is_none());
249        assert_eq!(file.group.index.component_count(), 3);
250        assert!(file.group.blocks.iter().all(|b| b.total_difficulty.is_none()));
251    }
252
253    #[test]
254    fn merge_spanning_file_excludes_post_merge_blocks_from_accumulator() {
255        // Blocks 0-1 pre-merge (non-zero difficulty), blocks 2-3 post-merge (zero difficulty).
256        let blocks: Vec<_> = (0..4)
257            .map(|n| export_block(n, if n < 2 { U256::from(1) } else { U256::ZERO }))
258            .collect();
259        let file = write_and_read(&blocks);
260
261        // Total difficulty is kept for every block, so the component count stays at 4.
262        assert_eq!(file.group.index.component_count(), 4);
263        assert!(file.group.blocks.iter().all(|b| b.total_difficulty.is_some()));
264
265        // The accumulator must cover only the pre-merge blocks, not the whole chunk.
266        let expected = Accumulator::from_header_records(
267            &blocks[..2]
268                .iter()
269                .map(|b| HeaderRecord {
270                    block_hash: b.block_hash,
271                    total_difficulty: b.total_difficulty,
272                })
273                .collect::<Vec<_>>(),
274        )
275        .unwrap();
276        assert_eq!(file.group.accumulator.unwrap().root, expected.root);
277    }
278}