Skip to main content

reth_era_utils/
export.rs

1//! Logic to export from database era1 block history
2//! and injecting them into era1 files with `Era1Writer`.
3
4use crate::calculate_td_by_number;
5use alloy_consensus::{BlockHeader, Sealable, TxReceipt};
6use alloy_primitives::{BlockNumber, U256};
7use eyre::{eyre, Result};
8use reth_era::{
9    common::file_ops::{EraFileId, StreamWriter},
10    e2s::types::IndexEntry,
11    era1::{
12        file::Era1Writer,
13        types::{
14            execution::{
15                Accumulator, BlockTuple, CompressedBody, CompressedHeader, CompressedReceipts,
16                HeaderRecord, TotalDifficulty, MAX_BLOCKS_PER_ERA1,
17            },
18            group::{BlockIndex, Era1Id},
19        },
20    },
21};
22use reth_fs_util as fs;
23use reth_primitives_traits::Block;
24use reth_storage_api::{BlockNumReader, BlockReader, HeaderProvider};
25use std::{
26    path::PathBuf,
27    time::{Duration, Instant},
28};
29use tracing::{debug, info, warn};
30
31const REPORT_INTERVAL_SECS: u64 = 10;
32const ENTRY_HEADER_SIZE: usize = 8;
33const VERSION_ENTRY_SIZE: usize = ENTRY_HEADER_SIZE;
34
35/// Configuration to export block history
36/// to era1 files
37#[derive(Clone, Debug)]
38pub struct ExportConfig {
39    /// Directory to export era1 files to
40    pub dir: PathBuf,
41    /// First block to export
42    pub first_block_number: BlockNumber,
43    /// Last block to export
44    pub last_block_number: BlockNumber,
45    /// Number of blocks per era1 file
46    /// It can never be larger than `MAX_BLOCKS_PER_ERA1 = 8192`
47    /// See also <`https://github.com/eth-clients/e2store-format-specs/blob/main/formats/era1.md`>
48    pub max_blocks_per_file: u64,
49    /// Network name.
50    pub network: String,
51}
52
53impl Default for ExportConfig {
54    fn default() -> Self {
55        Self {
56            dir: PathBuf::new(),
57            first_block_number: 0,
58            last_block_number: (MAX_BLOCKS_PER_ERA1 - 1) as u64,
59            max_blocks_per_file: MAX_BLOCKS_PER_ERA1 as u64,
60            network: "mainnet".to_string(),
61        }
62    }
63}
64
65impl ExportConfig {
66    /// Validates the export configuration parameters
67    pub fn validate(&self) -> Result<()> {
68        if self.max_blocks_per_file > MAX_BLOCKS_PER_ERA1 as u64 {
69            return Err(eyre!(
70                "Max blocks per file ({}) exceeds ERA1 limit ({})",
71                self.max_blocks_per_file,
72                MAX_BLOCKS_PER_ERA1
73            ));
74        }
75
76        if self.max_blocks_per_file == 0 {
77            return Err(eyre!("Max blocks per file cannot be zero"));
78        }
79
80        Ok(())
81    }
82}
83
84/// Fetches block history data from the provider
85/// and prepares it for export to era1 files
86/// for a given number of blocks then writes them to disk.
87pub fn export<P>(provider: &P, config: &ExportConfig) -> Result<Vec<PathBuf>>
88where
89    P: BlockReader,
90{
91    config.validate()?;
92    info!(
93        "Exporting blockchain history from block {} to {} with this max of blocks per file of {}",
94        config.first_block_number, config.last_block_number, config.max_blocks_per_file
95    );
96
97    // Determine the actual last block to export
98    // best_block_number() might be outdated, so check actual block availability
99    let last_block_number = determine_export_range(provider, config)?;
100
101    info!(
102        target: "era::history::export",
103        first = config.first_block_number,
104        last = last_block_number,
105        max_blocks_per_file = config.max_blocks_per_file,
106        "Preparing era1 export data"
107    );
108
109    if !config.dir.exists() {
110        fs::create_dir_all(&config.dir)
111            .map_err(|e| eyre!("Failed to create output directory: {}", e))?;
112    }
113
114    let start_time = Instant::now();
115    let mut last_report_time = Instant::now();
116    let report_interval = Duration::from_secs(REPORT_INTERVAL_SECS);
117
118    let mut created_files = Vec::new();
119    let mut total_blocks_processed = 0;
120
121    let mut total_difficulty = if config.first_block_number > 0 {
122        let prev_block_number = config.first_block_number - 1;
123        calculate_td_by_number(provider, prev_block_number)?
124    } else {
125        U256::ZERO
126    };
127
128    // Process blocks in chunks according to `max_blocks_per_file`
129    for start_block in
130        (config.first_block_number..=last_block_number).step_by(config.max_blocks_per_file as usize)
131    {
132        let end_block = (start_block + config.max_blocks_per_file - 1).min(last_block_number);
133        let block_count = (end_block - start_block + 1) as usize;
134
135        info!(
136            target: "era::history::export",
137            "Processing blocks {start_block} to {end_block} ({block_count} blocks)"
138        );
139
140        let headers = provider.headers_range(start_block..=end_block)?;
141
142        // Pre-compute accumulator from headers to determine filename
143        let mut precompute_td = total_difficulty;
144        let header_records: Vec<HeaderRecord> = headers
145            .iter()
146            .map(|h| {
147                precompute_td += h.difficulty();
148                HeaderRecord { block_hash: h.hash_slow(), total_difficulty: precompute_td }
149            })
150            .collect();
151        let accumulator = Accumulator::from_header_records(&header_records)
152            .map_err(|e| eyre!("Failed to compute accumulator: {e}"))?;
153        let file_hash: [u8; 4] = accumulator.root[..4].try_into().unwrap();
154
155        let era1_id =
156            Era1Id::new(&config.network, start_block, block_count as u32).with_hash(file_hash);
157
158        let era1_id = if config.max_blocks_per_file == MAX_BLOCKS_PER_ERA1 as u64 {
159            era1_id
160        } else {
161            era1_id.with_era_count()
162        };
163
164        debug!("Final file name {}", era1_id.to_file_name());
165        let file_path = config.dir.join(era1_id.to_file_name());
166        let file = std::fs::File::create(&file_path)?;
167        let mut writer = Era1Writer::new(file);
168        writer.write_version()?;
169
170        let mut offsets = Vec::<i64>::with_capacity(block_count);
171        let mut position = VERSION_ENTRY_SIZE as i64;
172        let mut blocks_written = 0;
173
174        for (i, header) in headers.into_iter().enumerate() {
175            let expected_block_number = start_block + i as u64;
176
177            let (compressed_header, compressed_body, compressed_receipts) = compress_block_data(
178                provider,
179                header,
180                expected_block_number,
181                &mut total_difficulty,
182            )?;
183
184            let difficulty = TotalDifficulty::new(total_difficulty);
185
186            let header_size = compressed_header.data.len() + ENTRY_HEADER_SIZE;
187            let body_size = compressed_body.data.len() + ENTRY_HEADER_SIZE;
188            let receipts_size = compressed_receipts.data.len() + ENTRY_HEADER_SIZE;
189            let difficulty_size = 32 + ENTRY_HEADER_SIZE; // U256 is 32 + 8 bytes header overhead
190            let total_size = (header_size + body_size + receipts_size + difficulty_size) as i64;
191
192            let block_tuple = BlockTuple::new(
193                compressed_header,
194                compressed_body,
195                compressed_receipts,
196                difficulty,
197            );
198
199            offsets.push(position);
200            position += total_size;
201
202            writer.write_block(&block_tuple)?;
203            blocks_written += 1;
204            total_blocks_processed += 1;
205
206            if last_report_time.elapsed() >= report_interval {
207                info!(
208                    target: "era::history::export",
209                    "Export progress: block {expected_block_number}/{last_block_number} ({:.2}%) - elapsed: {:?}",
210                    (total_blocks_processed as f64) /
211                        ((last_block_number - config.first_block_number + 1) as f64) *
212                        100.0,
213                    start_time.elapsed()
214                );
215                last_report_time = Instant::now();
216            }
217        }
218        if blocks_written > 0 {
219            // Convert absolute offsets to relative (measured from block-index entry start)
220            let accumulator_entry_size = (ENTRY_HEADER_SIZE + 32) as i64;
221            let block_index_position = position + accumulator_entry_size;
222            let relative_offsets: Vec<i64> =
223                offsets.iter().map(|&abs| abs - block_index_position).collect();
224            let block_index = BlockIndex::new(start_block, relative_offsets);
225
226            writer.write_accumulator(&accumulator)?;
227            writer.write_block_index(&block_index)?;
228            writer.flush()?;
229
230            info!(
231                target: "era::history::export",
232                "Wrote ERA1 file: {file_path:?} with {blocks_written} blocks"
233            );
234            created_files.push(file_path);
235        }
236    }
237
238    info!(
239        target: "era::history::export",
240        "Successfully wrote {} ERA1 files in {:?}",
241        created_files.len(),
242        start_time.elapsed()
243    );
244
245    Ok(created_files)
246}
247
248// Determines the actual last block number that can be exported,
249// Uses `headers_range` fallback when `best_block_number` is stale due to static file storage.
250fn determine_export_range<P>(provider: &P, config: &ExportConfig) -> Result<BlockNumber>
251where
252    P: HeaderProvider + BlockNumReader,
253{
254    let best_block_number = provider.best_block_number()?;
255
256    let last_block_number = if best_block_number < config.last_block_number {
257        warn!(
258            "Last block {} is beyond current head {}, setting last = head",
259            config.last_block_number, best_block_number
260        );
261
262        // Check if more blocks are actually available beyond what `best_block_number()` reports
263        if let Ok(headers) = provider.headers_range(best_block_number..=config.last_block_number) {
264            if let Some(last_header) = headers.last() {
265                let highest_block = last_header.number();
266                info!("Found highest available block {} via headers_range", highest_block);
267                highest_block
268            } else {
269                warn!("No headers found in range, using best_block_number {}", best_block_number);
270                best_block_number
271            }
272        } else {
273            warn!("headers_range failed, using best_block_number {}", best_block_number);
274            best_block_number
275        }
276    } else {
277        config.last_block_number
278    };
279
280    Ok(last_block_number)
281}
282
283// Compresses block data and returns compressed components with metadata
284fn compress_block_data<P>(
285    provider: &P,
286    header: P::Header,
287    expected_block_number: BlockNumber,
288    total_difficulty: &mut U256,
289) -> Result<(CompressedHeader, CompressedBody, CompressedReceipts)>
290where
291    P: BlockReader,
292{
293    let actual_block_number = header.number();
294
295    if expected_block_number != actual_block_number {
296        return Err(eyre!("Expected block {expected_block_number}, got {actual_block_number}"));
297    }
298
299    // CompressedBody must contain the block *body* (rlp(body)), not the full block (rlp(block)).
300    let body = provider
301        .block_by_number(actual_block_number)?
302        .ok_or_else(|| eyre!("Block not found for block {}", actual_block_number))?
303        .into_body();
304
305    let receipts = provider
306        .receipts_by_block(actual_block_number.into())?
307        .ok_or_else(|| eyre!("Receipts not found for block {}", actual_block_number))?;
308
309    *total_difficulty += header.difficulty();
310
311    let compressed_header = CompressedHeader::from_header(&header)?;
312    let compressed_body = CompressedBody::from_body(&body)?;
313    let receipts_with_bloom: Vec<_> =
314        receipts.iter().map(|r| TxReceipt::with_bloom_ref(r)).collect();
315    let compressed_receipts = CompressedReceipts::from_encodable_list(&receipts_with_bloom)
316        .map_err(|e| eyre!("Failed to compress receipts: {}", e))?;
317
318    Ok((compressed_header, compressed_body, compressed_receipts))
319}
320
321#[cfg(test)]
322mod tests {
323    use crate::ExportConfig;
324    use reth_era::era1::types::execution::MAX_BLOCKS_PER_ERA1;
325    use tempfile::tempdir;
326
327    #[test]
328    fn test_export_config_validation() {
329        let temp_dir = tempdir().unwrap();
330
331        // Default config should pass
332        let default_config = ExportConfig::default();
333        assert!(default_config.validate().is_ok(), "Default config should be valid");
334
335        // Exactly at the limit should pass
336        let limit_config =
337            ExportConfig { max_blocks_per_file: MAX_BLOCKS_PER_ERA1 as u64, ..Default::default() };
338        assert!(limit_config.validate().is_ok(), "Config at ERA1 limit should pass validation");
339
340        // Valid config should pass
341        let valid_config = ExportConfig {
342            dir: temp_dir.path().to_path_buf(),
343            max_blocks_per_file: 1000,
344            ..Default::default()
345        };
346        assert!(valid_config.validate().is_ok(), "Valid config should pass validation");
347
348        // Zero blocks per file should fail
349        let zero_blocks_config = ExportConfig {
350            max_blocks_per_file: 0, // Invalid
351            ..Default::default()
352        };
353        let result = zero_blocks_config.validate();
354        assert!(result.is_err(), "Zero blocks per file should fail validation");
355        assert!(result.unwrap_err().to_string().contains("cannot be zero"));
356
357        // Exceeding era1 limit should fail
358        let oversized_config = ExportConfig {
359            max_blocks_per_file: MAX_BLOCKS_PER_ERA1 as u64 + 1, // Invalid
360            ..Default::default()
361        };
362        let result = oversized_config.validate();
363        assert!(result.is_err(), "Oversized blocks per file should fail validation");
364        assert!(result.unwrap_err().to_string().contains("exceeds ERA1 limit"));
365    }
366}